Skip to content

fix: Fix memory leak with dlpack when using python Tensor objects #421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions python/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import copy
import gc
import json
import os
import shutil
import sys
import time
import unittest
from collections import Counter
from contextlib import contextmanager

import numpy
import pytest
Expand Down Expand Up @@ -296,6 +304,96 @@ def test_tensor_from_numpy(self):
numpy.testing.assert_array_equal(torch_tensor.numpy(), cpu_array)
assert torch_tensor.data_ptr() == cpu_array.ctypes.data

async def _tensor_from_numpy(self):
owner = numpy.ones(2**27)
tensor = tritonserver.Tensor.from_dlpack(owner)
array = numpy.from_dlpack(tensor)
del owner
del tensor
del array
await asyncio.sleep(0.1)

async def _async_test_runs(self):
tasks = []
for _ in range(100):
tasks.append(asyncio.create_task(self._tensor_from_numpy()))
try:
await asyncio.wait(tasks)
except Exception as e:
print(e)

@staticmethod
@contextmanager
def object_collector():
gc.collect()
objects_before = gc.get_objects()
yield
objects_after = gc.get_objects()
new_objects = [type(x) for x in objects_after[len(objects_before) :]]
tensor_objects = [
x for x in objects_after if isinstance(x, tritonserver.Tensor)
]
if tensor_objects:
print("Tensor objects")
print(len(tensor_objects))
print(type(tensor_objects[-1].memory_buffer.owner))
print(
f"\nTotal Collected Objects ({len(new_objects)}) {Counter(new_objects)}"
)
assert len(tensor_objects) == 0, "Leaked Tensors"

def test_cpu_memory_leak_async(self):
with TestTensor.object_collector():
asyncio.run(self._async_test_runs())

def test_cpu_memory_leak_sync(self):
with TestTensor.object_collector():
for _ in range(100):
owner = numpy.ones(2**27)
tensor = tritonserver.Tensor.from_dlpack(owner)
array = numpy.from_dlpack(tensor)
del owner
del tensor
del array

@pytest.mark.skipif(cupy is None, reason="Skipping gpu memory, cupy not installed")
def test_gpu_memory_leak(self):
with TestTensor.object_collector():
for _ in range(100):
owner = cupy.ones(2**27)
tensor = tritonserver.Tensor.from_dlpack(owner)
array = cupy.from_dlpack(tensor)
del owner
del tensor
del array

def test_reference_counts(self):
with TestTensor.object_collector():
owner = numpy.ones(2**27)
owner_data = owner.ctypes.data
assert sys.getrefcount(owner) - 1 == 1, "Invalid Count"

tensor = tritonserver.Tensor.from_dlpack(owner)
assert sys.getrefcount(owner) - 1 == 2, "Invalid Count"
assert sys.getrefcount(tensor) - 1 == 1, "Invalid Count"
del owner

numpy_array = numpy.from_dlpack(tensor)
assert owner_data == numpy_array.ctypes.data
assert sys.getrefcount(tensor) - 1 == 2, "Invalid Count"
assert sys.getrefcount(numpy_array) - 1 == 1, "Invalid Count"

tensor.shape = [2, 2**26]

assert numpy_array.shape == (2**27,), "Invalid Shape"

numpy_array_2 = numpy.from_dlpack(tensor)
del tensor
assert owner_data == numpy_array.ctypes.data
assert numpy_array_2.shape == (2, 2**26)
del numpy_array
del numpy_array_2


class TestServer:
def test_not_started(self):
Expand Down
90 changes: 55 additions & 35 deletions python/tritonserver/_api/_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,8 @@ def __dlpack__(self, *, stream=None):

self._sync_on_requested_stream(stream)

dl_managed_tensor = Tensor._create_managed_tensor()
dl_managed_tensor.dl_tensor.data = self.data_ptr
dl_managed_tensor.dl_tensor.device = DLDevice(
TRITON_MEMORY_TYPE_TO_DLPACK_DEVICE_TYPE[self.memory_type],
self.memory_type_id,
)
dl_managed_tensor = self._create_managed_tensor()

dl_managed_tensor.dl_tensor.dtype = TRITON_TO_DLPACK_DTYPE[self.data_type]
dl_managed_tensor.dl_tensor.ndim = len(self.shape)
dl_managed_tensor.dl_tensor.shape = (ctypes.c_int64 * len(self.shape))(
*self.shape
)
dl_managed_tensor.dl_tensor.strides = ctypes.POINTER(ctypes.c_int64)()
dl_managed_tensor.dl_tensor.byte_offset = 0
dl_managed_tensor.deleter = Tensor._managed_tensor_deleter

self._set_dlpack_manager_ctx(dl_managed_tensor)
pycapsule = ctypes.pythonapi.PyCapsule_New(
ctypes.byref(dl_managed_tensor),
c_str_dltensor,
Expand Down Expand Up @@ -600,26 +585,39 @@ def _from_numpy(obj: numpy.ndarray | numpy.generic) -> Tensor:
size=obj.itemsize * obj.size,
owner=obj,
)

return Tensor(data_type, shape, memory_buffer)

@staticmethod
def _create_managed_tensor():
def _create_managed_tensor(self) -> DLManagedTensor:
# Allocates space for a managed tensor object
# and fills in the fields
#
# To ensure the lifetime of the managed tensor we create a
# context object that includes a newly created shape array and a
# reference to self

size = ctypes.c_size_t(ctypes.sizeof(DLManagedTensor))
address = ctypes.pythonapi.PyMem_RawMalloc(size)
return DLManagedTensor.from_address(address)
dl_managed_tensor = DLManagedTensor.from_address(address)
dl_managed_tensor.dl_tensor.data = self.data_ptr
dl_managed_tensor.dl_tensor.device = DLDevice(
TRITON_MEMORY_TYPE_TO_DLPACK_DEVICE_TYPE[self.memory_type],
self.memory_type_id,
)
dl_managed_tensor.dl_tensor.dtype = TRITON_TO_DLPACK_DTYPE[self.data_type]
dl_managed_tensor.dl_tensor.ndim = len(self.shape)
manager_ctx = _ManagerCtx(self)
dl_managed_tensor.dl_tensor.shape = manager_ctx.shape
dl_managed_tensor.dl_tensor.strides = manager_ctx.strides
dl_managed_tensor.dl_tensor.byte_offset = 0
dl_managed_tensor.deleter = Tensor._managed_tensor_deleter
dl_managed_tensor.manager_ctx = manager_ctx.reference()
return dl_managed_tensor

@staticmethod
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _managed_tensor_deleter(handle: int) -> None:
dl_managed_tensor = DLManagedTensor.from_address(handle)
tensor_obj_ptr = ctypes.cast(
dl_managed_tensor.manager_ctx, ctypes.POINTER(ctypes.py_object)
)
tensor_obj = tensor_obj_ptr.contents
ctypes.pythonapi.Py_DecRef(tensor_obj)
shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
ctypes.pythonapi.Py_DecRef(shape_obj)
_ManagerCtx.release(dl_managed_tensor.manager_ctx)
ctypes.pythonapi.PyMem_RawFree(handle)

@staticmethod
Expand All @@ -639,14 +637,36 @@ def _pycapsule_deleter(handle: ctypes.c_void_p) -> None:
print(f"Exception occurred while deleting capsule: {e}")
raise e

def _set_dlpack_manager_ctx(self, dl_managed_tensor):
tensor_obj = ctypes.py_object(self)
tensor_obj_ptr = ctypes.pointer(tensor_obj)
dl_managed_tensor.manager_ctx = ctypes.cast(tensor_obj_ptr, ctypes.c_void_p)
shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
ctypes.pythonapi.Py_IncRef(tensor_obj)
ctypes.pythonapi.Py_IncRef(shape_obj)

_from_converters: ClassVar[dict[type, Callable[[Any], Tensor]]] = dict(
{numpy.ndarray: _from_numpy, numpy.generic: _from_numpy, list: _from_list},
)


class _ManagerCtx:
# To ensure the lifetime of the managed tensor we create a
# context object that includes a newly created shape array and a
# reference to self

def __init__(self, tensor: Tensor) -> None:
self._tensor = tensor
self.shape = (ctypes.c_int64 * len(tensor.shape))(*tensor.shape)
self.strides = ctypes.POINTER(ctypes.c_int64)()

def reference(self) -> ctypes.c_void_p:
py_obj = ctypes.py_object(self)
ctypes.pythonapi.Py_IncRef(py_obj)

# Note: Could not find a direct way to cast a python object
# to a c_void_p. The mechanism is to either use id(self) or
# cast as described here:
#
# https://groups.google.com/g/dev-python/c/QRRqVC7gkf4/m/zH7l1gTXBwAJ
#
# To avoid relying on the behavior of id() we use the casting mechanism

return ctypes.POINTER(ctypes.c_void_p)(py_obj)[0]

@staticmethod
def release(reference: ctypes.c_void_p) -> None:
py_obj = ctypes.cast(reference, ctypes.py_object)
ctypes.pythonapi.Py_DecRef(py_obj)
Loading