From c9bcbd9f8b1ccff974a3064313a016eef718b1b2 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 20 Jan 2025 11:46:30 +0530 Subject: [PATCH 01/17] Reuse response allocations --- src/grpc/infer_handler.h | 112 ++++++++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 30 deletions(-) diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 86428a514e..decdff81b1 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -102,9 +102,9 @@ struct RequestReleasePayload final { // // ResponseQueue // -// A simple queue holding the responses to be written. Uses a -// vector of persistent message objects to prevent allocating -// memory for each response to be written. +// This class implements a queue to manage responses that need to be written. +// It internally uses a reusable pool of persistent message objects to avoid +// allocating memory for each response individually. // template class ResponseQueue { @@ -113,19 +113,29 @@ class ResponseQueue { ~ResponseQueue() { + // Delete all responses in the reusable pool + for (auto response : reusable_pool_) { + delete response; + } + + // Delete all responses currently in the queue for (auto response : responses_) { delete response; } } - // Resets the queue + // Resets the queue to its initial state void Reset() { + std::lock_guard lock(mtx_); alloc_count_ = 0; ready_count_ = 0; - current_index_ = 0; - for (auto response : responses_) { - response->Clear(); + pop_count_ = 0; + + while (!responses_.empty()) { + responses_.front()->Clear(); + reusable_pool_.push_back(responses_.front()); + responses_.pop_front(); } } @@ -137,17 +147,27 @@ class ResponseQueue { std::lock_guard lock(mtx_); alloc_count_ = 1; if (responses_.size() < 1) { - responses_.push_back(new ResponseType()); + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { + responses_.push_back(new ResponseType()); + } } return responses_[0]; } - // Allocates a response on the head of the queue + // Allocates a response at the end of the queue void AllocateResponse() { std::lock_guard lock(mtx_); alloc_count_++; - if (responses_.size() < alloc_count_) { + + // Use a response from the reusable pool if available + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { responses_.push_back(new ResponseType()); } } @@ -156,12 +176,15 @@ class ResponseQueue { ResponseType* GetLastAllocatedResponse() { std::lock_guard lock(mtx_); - if (responses_.size() < alloc_count_) { + + // Ensure that the requested response has been allocated + if ((responses_.size() + pop_count_) < alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access the response not yet allocated"; return nullptr; } - return responses_[alloc_count_ - 1]; + + return responses_.back(); } // Marks the next non-ready response complete @@ -178,43 +201,71 @@ class ResponseQueue { return true; } - // Gets the current response from the tail of - // the queue. + // Gets the current response from the front of the queue ResponseType* GetCurrentResponse() { std::lock_guard lock(mtx_); - if (current_index_ >= ready_count_) { + if (pop_count_ >= ready_count_) { LOG_ERROR << "[INTERNAL] Attempting to access current response when it " "is not ready"; return nullptr; } - return responses_[current_index_]; + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses are available in the queue."; + return nullptr; + } + + return responses_.front(); } // Gets the response at the specified index ResponseType* GetResponseAt(const uint32_t index) { std::lock_guard lock(mtx_); + + // Check if the index is valid for allocated responses if (index >= alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access response which is not yet " "allocated"; return nullptr; } - return responses_[index]; + if (index < pop_count_) { + LOG_ERROR << "[INTERNAL] Attempting to access a response that has " + "already been removed from the queue."; + return nullptr; + } + + // Adjust index based on number of popped responses to get actual index in + // 'responses_' + return responses_[index - pop_count_]; } - // Pops the response from the tail of the queue + // Removes the current response from the front of the queue void PopResponse() { std::lock_guard lock(mtx_); - current_index_++; + + // Ensure there are responses in the queue to pop + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses in the queue to pop."; + return; + } + + // Clear and move the current response to the reusable pool + auto response = responses_.front(); + response->Clear(); + reusable_pool_.push_back(response); + responses_.pop_front(); + pop_count_++; } // Returns whether the queue is empty bool IsEmpty() { std::lock_guard lock(mtx_); - return ((alloc_count_ == ready_count_) && (alloc_count_ == current_index_)); + return ( + (alloc_count_ == ready_count_) && (alloc_count_ == pop_count_) && + responses_.empty()); } // Returns whether the queue has responses @@ -222,20 +273,21 @@ class ResponseQueue { bool HasReadyResponse() { std::lock_guard lock(mtx_); - return (ready_count_ > current_index_); + return (ready_count_ > pop_count_); } private: - std::vector responses_; + // Stores responses that need to be written. The front of the queue indicates + // the current response, while the back indicates the last allocated response. + std::deque responses_; + // Stores completed responses that can be reused + std::deque reusable_pool_; std::mutex mtx_; - // There are three indices to track the responses in the queue - // Tracks the allocated response - uint32_t alloc_count_; - // Tracks the response that is ready to be written - uint32_t ready_count_; - // Tracks the response next in the queue to be written - uint32_t current_index_; + // Three counters are used to track and manage responses in the queue + uint32_t alloc_count_; // Number of allocated responses + uint32_t ready_count_; // Number of ready-to-write responses + uint32_t pop_count_; // Number of removed responses from the queue }; From 2fba1ddebfbf89ff9f1a10aff4ce23aea167f93e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 21 Jan 2025 20:12:26 +0530 Subject: [PATCH 02/17] ResponseQueue Threshold --- src/command_line_parser.cc | 14 ++++++++++++++ src/grpc/grpc_server.cc | 10 ++++++---- src/grpc/grpc_server.h | 1 + src/grpc/infer_handler.h | 32 +++++++++++++++++++++++--------- src/grpc/stream_infer_handler.h | 4 ++-- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/command_line_parser.cc b/src/command_line_parser.cc index 53a103d33b..5976912c1d 100644 --- a/src/command_line_parser.cc +++ b/src/command_line_parser.cc @@ -306,6 +306,7 @@ enum TritonOptionId { OPTION_GRPC_ADDRESS, OPTION_GRPC_HEADER_FORWARD_PATTERN, OPTION_GRPC_INFER_ALLOCATION_POOL_SIZE, + OPTION_GRPC_MAX_RESPONSE_POOL_SIZE, OPTION_GRPC_USE_SSL, OPTION_GRPC_USE_SSL_MUTUAL, OPTION_GRPC_SERVER_CERT, @@ -536,6 +537,11 @@ TritonParser::SetupOptions() "allocated for reuse. As long as the number of in-flight requests " "doesn't exceed this value there will be no allocation/deallocation of " "request/response objects."}); + grpc_options_.push_back( + {OPTION_GRPC_MAX_RESPONSE_POOL_SIZE, "grpc-max-response-pool-size", + Option::ArgInt, + "The maximum number of inference response objects that can remain " + "allocated in the pool at any given time."}); grpc_options_.push_back( {OPTION_GRPC_USE_SSL, "grpc-use-ssl", Option::ArgBool, "Use SSL authentication for GRPC requests. Default is false."}); @@ -1438,6 +1444,14 @@ TritonParser::Parse(int argc, char** argv) case OPTION_GRPC_INFER_ALLOCATION_POOL_SIZE: lgrpc_options.infer_allocation_pool_size_ = ParseOption(optarg); break; + case OPTION_GRPC_MAX_RESPONSE_POOL_SIZE: + lgrpc_options.max_response_pool_size_ = ParseOption(optarg); + if (lgrpc_options.max_response_pool_size_ <= 0) { + throw ParseException( + "Error: --grpc-max-response-pool-size must be greater " + "than 0."); + } + break; case OPTION_GRPC_USE_SSL: lgrpc_options.ssl_.use_ssl_ = ParseOption(optarg); break; diff --git a/src/grpc/grpc_server.cc b/src/grpc/grpc_server.cc index 74ec443ae6..f4e90a0c93 100644 --- a/src/grpc/grpc_server.cc +++ b/src/grpc/grpc_server.cc @@ -2395,8 +2395,8 @@ Server::Server( "ModelInferHandler", tritonserver_, trace_manager_, shm_manager_, &service_, model_infer_cq_.get(), options.infer_allocation_pool_size_ /* max_state_bucket_count */, - options.infer_compression_level_, restricted_kv, - options.forward_header_pattern_)); + options.max_response_pool_size_, options.infer_compression_level_, + restricted_kv, options.forward_header_pattern_)); } // Handler for streaming inference requests. Keeps one handler for streaming @@ -2405,8 +2405,8 @@ Server::Server( "ModelStreamInferHandler", tritonserver_, trace_manager_, shm_manager_, &service_, model_stream_infer_cq_.get(), options.infer_allocation_pool_size_ /* max_state_bucket_count */, - options.infer_compression_level_, restricted_kv, - options.forward_header_pattern_)); + options.max_response_pool_size_, options.infer_compression_level_, + restricted_kv, options.forward_header_pattern_)); } Server::~Server() @@ -2472,6 +2472,8 @@ Server::GetOptions(Options& options, UnorderedMapType& options_map) RETURN_IF_ERR(GetValue( options_map, "infer_allocation_pool_size", &options.infer_allocation_pool_size_)); + RETURN_IF_ERR(GetValue( + options_map, "max_response_pool_size", &options.max_response_pool_size_)); RETURN_IF_ERR(GetValue( options_map, "forward_header_pattern", &options.forward_header_pattern_)); diff --git a/src/grpc/grpc_server.h b/src/grpc/grpc_server.h index 89d8dc7388..8c942df060 100644 --- a/src/grpc/grpc_server.h +++ b/src/grpc/grpc_server.h @@ -89,6 +89,7 @@ struct Options { // requests doesn't exceed this value there will be no // allocation/deallocation of request/response objects. int infer_allocation_pool_size_{8}; + int max_response_pool_size_{INT_MAX}; RestrictedFeatures restricted_protocols_; std::string forward_header_pattern_; }; diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index decdff81b1..ef259a5d55 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -109,7 +109,11 @@ struct RequestReleasePayload final { template class ResponseQueue { public: - explicit ResponseQueue() { Reset(); } + explicit ResponseQueue(const size_t max_response_queue_size) + : max_response_queue_size_(max_response_queue_size) + { + Reset(); + } ~ResponseQueue() { @@ -160,7 +164,9 @@ class ResponseQueue { // Allocates a response at the end of the queue void AllocateResponse() { - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); + cv_.wait( + lock, [this] { return responses_.size() < max_response_queue_size_; }); alloc_count_++; // Use a response from the reusable pool if available @@ -257,6 +263,8 @@ class ResponseQueue { reusable_pool_.push_back(response); responses_.pop_front(); pop_count_++; + + cv_.notify_one(); } // Returns whether the queue is empty @@ -282,6 +290,8 @@ class ResponseQueue { std::deque responses_; // Stores completed responses that can be reused std::deque reusable_pool_; + std::condition_variable cv_; + size_t max_response_queue_size_; std::mutex mtx_; // Three counters are used to track and manage responses in the queue @@ -1122,7 +1132,7 @@ class InferHandlerState { } explicit InferHandlerState( - TRITONSERVER_Server* tritonserver, + TRITONSERVER_Server* tritonserver, const size_t max_response_queue_size, const std::shared_ptr& context, Steps start_step = Steps::START) : tritonserver_(tritonserver), async_notify_state_(false) { @@ -1136,7 +1146,8 @@ class InferHandlerState { delay_response_completion_ms_ = ParseDebugVariable("TRITONSERVER_DELAY_RESPONSE_COMPLETION"); - response_queue_.reset(new ResponseQueue()); + response_queue_.reset( + new ResponseQueue(max_response_queue_size)); Reset(context, start_step); } @@ -1289,7 +1300,7 @@ class InferHandler : public HandlerBase { const std::string& name, const std::shared_ptr& tritonserver, ServiceType* service, ::grpc::ServerCompletionQueue* cq, - size_t max_state_bucket_count, + size_t max_state_bucket_count, size_t max_response_queue_size, std::pair restricted_kv, const std::string& header_forward_pattern); virtual ~InferHandler(); @@ -1326,7 +1337,8 @@ class InferHandler : public HandlerBase { } if (state == nullptr) { - state = new State(tritonserver, context, start_step); + state = new State( + tritonserver, max_response_queue_size_, context, start_step); } if (start_step == Steps::START) { @@ -1427,6 +1439,7 @@ class InferHandler : public HandlerBase { const size_t max_state_bucket_count_; std::vector state_bucket_; + const size_t max_response_queue_size_; std::pair restricted_kv_; std::string header_forward_pattern_; re2::RE2 header_forward_regex_; @@ -1440,11 +1453,12 @@ InferHandler:: const std::string& name, const std::shared_ptr& tritonserver, ServiceType* service, ::grpc::ServerCompletionQueue* cq, - size_t max_state_bucket_count, + size_t max_state_bucket_count, size_t max_response_queue_size, std::pair restricted_kv, const std::string& header_forward_pattern) : name_(name), tritonserver_(tritonserver), service_(service), cq_(cq), max_state_bucket_count_(max_state_bucket_count), + max_response_queue_size_(max_response_queue_size), restricted_kv_(restricted_kv), header_forward_pattern_(header_forward_pattern), header_forward_regex_(header_forward_pattern_) @@ -1600,12 +1614,12 @@ class ModelInferHandler const std::shared_ptr& shm_manager, inference::GRPCInferenceService::AsyncService* service, ::grpc::ServerCompletionQueue* cq, size_t max_state_bucket_count, - grpc_compression_level compression_level, + size_t max_response_queue_size, grpc_compression_level compression_level, std::pair restricted_kv, const std::string& forward_header_pattern) : InferHandler( name, tritonserver, service, cq, max_state_bucket_count, - restricted_kv, forward_header_pattern), + max_response_queue_size, restricted_kv, forward_header_pattern), trace_manager_(trace_manager), shm_manager_(shm_manager), compression_level_(compression_level) { diff --git a/src/grpc/stream_infer_handler.h b/src/grpc/stream_infer_handler.h index 40a8346703..1cd4ccbdc4 100644 --- a/src/grpc/stream_infer_handler.h +++ b/src/grpc/stream_infer_handler.h @@ -71,12 +71,12 @@ class ModelStreamInferHandler const std::shared_ptr& shm_manager, inference::GRPCInferenceService::AsyncService* service, ::grpc::ServerCompletionQueue* cq, size_t max_state_bucket_count, - grpc_compression_level compression_level, + size_t max_response_queue_size, grpc_compression_level compression_level, std::pair restricted_kv, const std::string& header_forward_pattern) : InferHandler( name, tritonserver, service, cq, max_state_bucket_count, - restricted_kv, header_forward_pattern), + max_response_queue_size, restricted_kv, header_forward_pattern), trace_manager_(trace_manager), shm_manager_(shm_manager), compression_level_(compression_level) { From 16d347aabfa90bace35ce5bd23e829bcc0a3fdea Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 21 Jan 2025 20:16:09 +0530 Subject: [PATCH 03/17] Update copyright --- src/command_line_parser.cc | 2 +- src/grpc/grpc_server.cc | 2 +- src/grpc/grpc_server.h | 2 +- src/grpc/stream_infer_handler.h | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/command_line_parser.cc b/src/command_line_parser.cc index 5976912c1d..0c113e38a9 100644 --- a/src/command_line_parser.cc +++ b/src/command_line_parser.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions diff --git a/src/grpc/grpc_server.cc b/src/grpc/grpc_server.cc index f4e90a0c93..5beb3aba72 100644 --- a/src/grpc/grpc_server.cc +++ b/src/grpc/grpc_server.cc @@ -1,4 +1,4 @@ -// Copyright 2019-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions diff --git a/src/grpc/grpc_server.h b/src/grpc/grpc_server.h index 8c942df060..f5ec5f87cd 100644 --- a/src/grpc/grpc_server.h +++ b/src/grpc/grpc_server.h @@ -1,4 +1,4 @@ -// Copyright 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions diff --git a/src/grpc/stream_infer_handler.h b/src/grpc/stream_infer_handler.h index 1cd4ccbdc4..75c51b0ec7 100644 --- a/src/grpc/stream_infer_handler.h +++ b/src/grpc/stream_infer_handler.h @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions From 740d167b39435e5f6d31e89c3bed72b187cae38b Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 24 Jan 2025 14:49:52 +0530 Subject: [PATCH 04/17] Test case --- qa/L0_decoupled/test.sh | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/qa/L0_decoupled/test.sh b/qa/L0_decoupled/test.sh index 22c37dff49..97e51843e4 100755 --- a/qa/L0_decoupled/test.sh +++ b/qa/L0_decoupled/test.sh @@ -127,6 +127,45 @@ for trial in $TRIALS; do kill $SERVER_PID wait $SERVER_PID + + SERVER_ARGS="--model-repository=$MODELDIR --grpc-max-response-pool-size=1" + SERVER_LOG="grpc_max_response_pool_size_1_${trial}_server.log" + CLIENT_LOG="grpc_max_response_pool_size_1_${trial}_client.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi + + for test in \ + test_one_to_none \ + test_one_to_one \ + test_one_to_many \ + test_no_streaming \ + test_response_order \ + test_wrong_shape; do + + echo "Test: $test" >>$CLIENT_LOG + set +e + python $DECOUPLED_TEST DecoupledTest.$test >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Test grpc-max-response-pool-size=1 ${trial} - $test Failed\n***" >>$CLIENT_LOG + echo -e "\n***\n*** Test grpc-max-response-pool-size=1 ${trial} - $test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + set -e + done + + kill $SERVER_PID + wait $SERVER_PID done # Test the server frontend can merge the responses of non-decoupled model that From 9020014e5cdea1d821c4373a8486e6d6fe896a9c Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 24 Jan 2025 14:51:16 +0530 Subject: [PATCH 05/17] Update copyright --- qa/L0_decoupled/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/L0_decoupled/test.sh b/qa/L0_decoupled/test.sh index 97e51843e4..20ca0fffa3 100755 --- a/qa/L0_decoupled/test.sh +++ b/qa/L0_decoupled/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions From 477a0a3fb6e142bb4bb03c7aeb0617e77b0175eb Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sun, 16 Feb 2025 19:25:50 +0000 Subject: [PATCH 06/17] Test case --- Dockerfile.QA | 3 + qa/L0_memory/client.py | 123 +++++++++++++++++++++++++++++++++++++++++ qa/L0_memory/test.sh | 114 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 qa/L0_memory/client.py diff --git a/Dockerfile.QA b/Dockerfile.QA index a1757026c2..41665cdaeb 100644 --- a/Dockerfile.QA +++ b/Dockerfile.QA @@ -128,6 +128,9 @@ RUN mkdir -p qa/common && \ cp bin/multi_server qa/L0_multi_server/. && \ cp bin/memory_test qa/L0_memory/. && \ cp bin/pinned_memory_manager_test qa/L0_memory/. && \ + mkdir qa/L0_memory/python_models/repeat_int32/1 && \ + cp /workspace/tritonbuild/python/examples/decoupled/repeat_model.py qa/L0_memory/python_models/repeat_int32/1/model.py && \ + cp /workspace/tritonbuild/python/examples/decoupled/repeat_config.pbtxt qa/L0_memory/python_models/repeat_int32/1/config.pbtxt && \ cp bin/repo_agent_test qa/L0_triton_repo_agent/. && \ cp lib/libtritonrepoagent_relocation.so qa/L0_triton_repo_agent/. && \ mkdir qa/L0_query/models/query/1 && \ diff --git a/qa/L0_memory/client.py b/qa/L0_memory/client.py new file mode 100644 index 0000000000..5f9233659a --- /dev/null +++ b/qa/L0_memory/client.py @@ -0,0 +1,123 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import unittest +import queue +import os +import numpy as np +import tritonclient.grpc as grpcclient +from tritonclient.utils import InferenceServerException +from functools import partial + +OUTPUT_NUM_ELEMENTS = int(os.getenv("OUTPUT_NUM_ELEMENTS", 1)) + + +class UserData: + def __init__(self): + self._completed_requests = queue.Queue() + + +def callback(user_data, result, error): + if error: + user_data._completed_requests.put(error, timeout=100) + else: + user_data._completed_requests.put(result, timeout=100) + + +class TestTritonInference(unittest.TestCase): + def setUp(self): + self.triton_client = grpcclient.InferenceServerClient(url="localhost:8001") + + def tearDown(self): + self.triton_client.stop_stream() + + def test_inference(self): + model_name = "repeat_int32" + num_responses = 256 + in_data = np.random.randint(0, 1000, num_responses, dtype=np.int32) + delay_data = np.zeros(num_responses, dtype=np.uint32) + wait_data = np.zeros(1, dtype=np.uint32) + user_data = UserData() + + inputs = [ + grpcclient.InferInput("IN", [num_responses], "INT32"), + grpcclient.InferInput("DELAY", [num_responses], "UINT32"), + grpcclient.InferInput("WAIT", [1], "UINT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUT"), + grpcclient.InferRequestedOutput("IDX"), + ] + + inputs[0].set_data_from_numpy(in_data) + inputs[1].set_data_from_numpy(delay_data) + inputs[2].set_data_from_numpy(wait_data) + + self.triton_client.start_stream(callback=partial(callback, user_data)) + self.triton_client.async_stream_infer( + model_name=model_name, + inputs=inputs, + outputs=outputs, + ) + + recv_count = 0 + while recv_count < num_responses: + data_item = user_data._completed_requests.get() + + if isinstance(data_item, InferenceServerException): + self.fail(f"InferenceServerException: {data_item}") + try: + response_idx = data_item.as_numpy("IDX")[0] + response_data = data_item.as_numpy("OUT") + expected_data = in_data[response_idx] + + print("response_data:", response_data, "response_idx", response_idx, "response_data.size:", response_data.size) + + self.assertEqual( + response_data[0], + expected_data, + f"Validation failed at index {response_idx} - response_data[0]: {response_data[0]}, expected_data: {expected_data}", + ) + self.assertEqual( + response_data.size, + OUTPUT_NUM_ELEMENTS, + f"Validation failed - response_data.size: {response_data.size}, OUTPUT_NUM_ELEMENTS: {OUTPUT_NUM_ELEMENTS}", + ) + + except Exception as e: + self.fail(f"Error processing response: {str(e)}") + recv_count += 1 + + self.assertEqual( + user_data._completed_requests.qsize(), + 0, + "Did not receive the expected number of responses.", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_memory/test.sh b/qa/L0_memory/test.sh index e7c08d9453..3790ea1852 100755 --- a/qa/L0_memory/test.sh +++ b/qa/L0_memory/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2025, NVIDIA CORPORATION. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -25,6 +25,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +source ../common/util.sh + TEST_LOG="./memory_test.log" MEMORY_TEST=./memory_test PINNED_MEMORY_MANAGER_TEST=./pinned_memory_manager_test @@ -39,6 +41,7 @@ rm -f TEST_LOG set +e $MEMORY_TEST >>$TEST_LOG 2>&1 if [ $? -ne 0 ]; then + cat $TEST_LOG echo -e "\n***\n*** Memory Test Failed\n***" RET=1 fi @@ -47,16 +50,119 @@ set -e set +e $PINNED_MEMORY_MANAGER_TEST >>$TEST_LOG 2>&1 if [ $? -ne 0 ]; then + cat $TEST_LOG echo -e "\n***\n*** Pinned Memory Manager Test Failed\n***" RET=1 fi set -e + +###### Test --grpc-max-response-pool-size server option ####### + +monitor_memory() { + local SERVER_PID=$1 + local MAX_MEM_FILE=$(mktemp) + echo "0" > "$MAX_MEM_FILE" + ( + local MAX_MEM=0 + while ps -p "$SERVER_PID" >/dev/null 2>&1; do + CURRENT_MEM=$(awk '/Rss:/ {print $2}' /proc/$SERVER_PID/smaps_rollup) + CURRENT_MEM=${CURRENT_MEM:-0} + if [ "$CURRENT_MEM" -gt "$MAX_MEM" ]; then + MAX_MEM=$CURRENT_MEM + echo "$MAX_MEM" > "$MAX_MEM_FILE" + fi + sleep 0.1 + done + echo "$MAX_MEM" > "$MAX_MEM_FILE" + exit 0 + ) & + + MONITOR_PID=$! + echo "$MONITOR_PID $MAX_MEM_FILE" +} + +stop_server_and_monitoring_memory() { + local MONITOR_PID=$1 + local SERVER_PID=$2 + kill "$MONITOR_PID" 2>/dev/null && wait "$MONITOR_PID" 2>/dev/null || true + kill "$SERVER_PID" 2>/dev/null && wait "$SERVER_PID" 2>/dev/null || true +} + +MODELDIR="./python_models" +export OUTPUT_NUM_ELEMENTS=49807360 + +RET=0 +SERVER=/opt/tritonserver/bin/tritonserver +SERVER_BASE_ARGS="--model-repository=${MODELDIR} --log-verbose=2 --allow-metrics=0" + +declare -A MEMORY_USAGE=() + +for POOL_SIZE in 1 50 default; do + if [[ "$POOL_SIZE" = "default" ]]; then + SERVER_ARGS="${SERVER_BASE_ARGS}" + else + SERVER_ARGS="${SERVER_BASE_ARGS} --grpc-max-response-pool-size=${POOL_SIZE}" + fi + + CLIENT_LOG="./client_pool_size_${POOL_SIZE}.log" + SERVER_LOG="./server_pool_size_${POOL_SIZE}.log" + + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + stop_server_and_monitoring_memory $MONITOR_PID $SERVER_PID + exit 1 + fi + sleep 2 + + # Capture initial memory usage + INIT_MEM=$(awk '/Rss:/ {print $2}' /proc/$SERVER_PID/smaps_rollup) + read -r MONITOR_PID MAX_MEM_FILE < <(monitor_memory "$SERVER_PID") + + # Run client script + set +e + python3 client.py >> $CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Running client for grpc-max-response-pool-size=${POOL_SIZE} FAILED\n***" >> $CLIENT_LOG 2>&1 + echo -e "\n***\n*** Running client for grpc-max-response-pool-size=${POOL_SIZE} FAILED\n***" + stop_server_and_monitoring_memory $MONITOR_PID $SERVER_PID + exit 1 + fi + set -e + sleep 2 + + stop_server_and_monitoring_memory $MONITOR_PID $SERVER_PID + + if [[ -s "$MAX_MEM_FILE" ]]; then + MAX_MEM=$(tail -n 1 "$MAX_MEM_FILE" 2>/dev/null || echo 0) + MEMORY_USAGE["$POOL_SIZE"]=$((MAX_MEM - INIT_MEM)) + echo "Pool size: $POOL_SIZE | Initial Memory: ${INIT_MEM} KB | Peak Memory: ${MEMORY_USAGE[$POOL_SIZE]} KB" >> "memory.log" + rm -f "$MAX_MEM_FILE" + else + echo "FAILED to collect memory usage for grpc-max-response-pool-size=${POOL_SIZE}" + exit 1 + fi +done + +prev_mem=0 +prev_size="" +for size in default 50 1; do + current_mem=${MEMORY_USAGE[$size]} + if [[ -n "$prev_size" && "$prev_mem" -ne 0 && "$current_mem" -ge "$prev_mem" ]]; then + echo -e "\n***\n*** FAILED - Memory $current_mem KB with pool=$size >= $prev_mem KB (with pool=$prev_size)\n***" + RET=1 + fi + prev_mem=$current_mem + prev_size=$size +done + + if [ $RET -eq 0 ]; then - echo -e "\n***\n*** Test Passed\n***" + echo -e "\n***\n*** Test Passed\n***" else - cat $TEST_LOG - echo -e "\n***\n*** Test FAILED\n***" + echo -e "\n***\n*** Test FAILED\n***" fi exit $RET From f49203c38460fdbb227f2ae87171fbe64130fdda Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sun, 16 Feb 2025 19:32:03 +0000 Subject: [PATCH 07/17] Fix pre-commit --- qa/L0_memory/client.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/qa/L0_memory/client.py b/qa/L0_memory/client.py index 5f9233659a..c5d2e77fdb 100644 --- a/qa/L0_memory/client.py +++ b/qa/L0_memory/client.py @@ -25,13 +25,13 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import unittest -import queue import os +import queue +import unittest +from functools import partial import numpy as np import tritonclient.grpc as grpcclient from tritonclient.utils import InferenceServerException -from functools import partial OUTPUT_NUM_ELEMENTS = int(os.getenv("OUTPUT_NUM_ELEMENTS", 1)) @@ -95,8 +95,6 @@ def test_inference(self): response_data = data_item.as_numpy("OUT") expected_data = in_data[response_idx] - print("response_data:", response_data, "response_idx", response_idx, "response_data.size:", response_data.size) - self.assertEqual( response_data[0], expected_data, From 0752b3f0b243428b4da2fcfa8fb4dc4f4cbf125d Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sun, 16 Feb 2025 19:34:12 +0000 Subject: [PATCH 08/17] Fix pre-commit --- qa/L0_memory/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/qa/L0_memory/client.py b/qa/L0_memory/client.py index c5d2e77fdb..da1b4708df 100644 --- a/qa/L0_memory/client.py +++ b/qa/L0_memory/client.py @@ -29,6 +29,7 @@ import queue import unittest from functools import partial + import numpy as np import tritonclient.grpc as grpcclient from tritonclient.utils import InferenceServerException From 31347a8737ac8465129f2672ab77812ccea9ace9 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 17 Feb 2025 02:15:02 +0530 Subject: [PATCH 09/17] Update --- qa/L0_memory/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/qa/L0_memory/test.sh b/qa/L0_memory/test.sh index 3790ea1852..4fa77d6748 100755 --- a/qa/L0_memory/test.sh +++ b/qa/L0_memory/test.sh @@ -91,6 +91,7 @@ stop_server_and_monitoring_memory() { MODELDIR="./python_models" export OUTPUT_NUM_ELEMENTS=49807360 +sed -i '$a\parameters: [{ key: "output_num_elements" value: { string_value: "'"$OUTPUT_NUM_ELEMENTS"'" }}]' $MODELDIR/repeat_int32/config.pbtxt RET=0 SERVER=/opt/tritonserver/bin/tritonserver From 1a6a49787ccbb50571936c3b6fbc129419e46715 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 17 Feb 2025 11:14:59 +0530 Subject: [PATCH 10/17] Update --- Dockerfile.QA | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.QA b/Dockerfile.QA index 41665cdaeb..5c323256e7 100644 --- a/Dockerfile.QA +++ b/Dockerfile.QA @@ -130,7 +130,7 @@ RUN mkdir -p qa/common && \ cp bin/pinned_memory_manager_test qa/L0_memory/. && \ mkdir qa/L0_memory/python_models/repeat_int32/1 && \ cp /workspace/tritonbuild/python/examples/decoupled/repeat_model.py qa/L0_memory/python_models/repeat_int32/1/model.py && \ - cp /workspace/tritonbuild/python/examples/decoupled/repeat_config.pbtxt qa/L0_memory/python_models/repeat_int32/1/config.pbtxt && \ + cp /workspace/tritonbuild/python/examples/decoupled/repeat_config.pbtxt qa/L0_memory/python_models/repeat_int32/config.pbtxt && \ cp bin/repo_agent_test qa/L0_triton_repo_agent/. && \ cp lib/libtritonrepoagent_relocation.so qa/L0_triton_repo_agent/. && \ mkdir qa/L0_query/models/query/1 && \ From 77555bbf5f4cda49d17dd075ee600f159bc0116e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 17 Feb 2025 18:52:40 +0530 Subject: [PATCH 11/17] Update --- Dockerfile.QA | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Dockerfile.QA b/Dockerfile.QA index 5c323256e7..2205e0c116 100644 --- a/Dockerfile.QA +++ b/Dockerfile.QA @@ -128,9 +128,7 @@ RUN mkdir -p qa/common && \ cp bin/multi_server qa/L0_multi_server/. && \ cp bin/memory_test qa/L0_memory/. && \ cp bin/pinned_memory_manager_test qa/L0_memory/. && \ - mkdir qa/L0_memory/python_models/repeat_int32/1 && \ - cp /workspace/tritonbuild/python/examples/decoupled/repeat_model.py qa/L0_memory/python_models/repeat_int32/1/model.py && \ - cp /workspace/tritonbuild/python/examples/decoupled/repeat_config.pbtxt qa/L0_memory/python_models/repeat_int32/config.pbtxt && \ + mkdir -p qa/L0_memory/python_models/repeat_int32/1 && \ cp bin/repo_agent_test qa/L0_triton_repo_agent/. && \ cp lib/libtritonrepoagent_relocation.so qa/L0_triton_repo_agent/. && \ mkdir qa/L0_query/models/query/1 && \ @@ -267,7 +265,11 @@ RUN cp -r qa/L0_decoupled/models qa/L0_decoupled/python_models/ && \ cp /workspace/tritonbuild/python/examples/decoupled/square_model.py \ qa/L0_decoupled/python_models/square_int32/1/. && \ cp /workspace/tritonbuild/python/examples/decoupled/square_config.pbtxt \ - qa/L0_decoupled/python_models/square_int32/. + qa/L0_decoupled/python_models/square_int32/. && \ + cp /workspace/tritonbuild/python/examples/decoupled/repeat_model.py \ + qa/L0_memory/python_models/repeat_int32/1/model.py && \ + cp /workspace/tritonbuild/python/examples/decoupled/repeat_config.pbtxt \ + qa/L0_memory/python_models/repeat_int32/config.pbtxt RUN mkdir -p qa/L0_decoupled_grpc_error && \ cp -r qa/L0_decoupled/. qa/L0_decoupled_grpc_error From 85ea2b1803d5fe6be895c12d69da1e5ee21a06f2 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 17 Feb 2025 22:41:59 +0530 Subject: [PATCH 12/17] Update --- qa/L0_memory/test.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/qa/L0_memory/test.sh b/qa/L0_memory/test.sh index 4fa77d6748..d0bc8a513b 100755 --- a/qa/L0_memory/test.sh +++ b/qa/L0_memory/test.sh @@ -93,7 +93,6 @@ MODELDIR="./python_models" export OUTPUT_NUM_ELEMENTS=49807360 sed -i '$a\parameters: [{ key: "output_num_elements" value: { string_value: "'"$OUTPUT_NUM_ELEMENTS"'" }}]' $MODELDIR/repeat_int32/config.pbtxt -RET=0 SERVER=/opt/tritonserver/bin/tritonserver SERVER_BASE_ARGS="--model-repository=${MODELDIR} --log-verbose=2 --allow-metrics=0" From b82eda0a3a664a183cde7109a87373f3e1b9c1e7 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 18 Feb 2025 13:03:05 +0530 Subject: [PATCH 13/17] Update documentation --- docs/customization_guide/deploy.md | 15 +++++++++++++++ src/command_line_parser.cc | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/docs/customization_guide/deploy.md b/docs/customization_guide/deploy.md index 9a93e8e0d6..85fb6db2f7 100644 --- a/docs/customization_guide/deploy.md +++ b/docs/customization_guide/deploy.md @@ -287,6 +287,21 @@ no untrusted files of same name exist in a location of higher search priority (e.g., System32). It is still recommended to add backend-specific dependencies to their corresponding backend folder when possible. +# GRPC server options +Triton Inference Server's gRPC inference handlers internally use states to manage inference requests and response queues. Each state consists of one inference request and one response queue. The response queue within a state can hold multiple response objects. These states remain allocated for reuse to optimize performance by minimizing dynamic allocations. + +You can configure the following parameters to balance memory usage and server performance: +- The maximum number of states that remain allocated. +- The maximum number of response objects that can stay allocated in the response queue. + +##### `--grpc-infer-allocation-pool-size=` +Specifies the maximum number of states (inference request/response queues) that remain allocated for reuse. If the number of in-flight requests does not exceed this value, no allocation or deallocation of request/response queues will occur. By default, this value is set to `8`. + +##### `--grpc-max-response-pool-size=` +Specifies the maximum number of inference response objects that can remain allocated in each response queue at any given time. This option is particularly useful in decoupled mode, where multiple responses are generated for a single request. By default, this value is set to `INT_MAX`. + +> [!Warning] +> Setting this value too low may negatively impact performance. diff --git a/src/command_line_parser.cc b/src/command_line_parser.cc index 0c113e38a9..bab2e2f48c 100644 --- a/src/command_line_parser.cc +++ b/src/command_line_parser.cc @@ -533,7 +533,7 @@ TritonParser::SetupOptions() grpc_options_.push_back( {OPTION_GRPC_INFER_ALLOCATION_POOL_SIZE, "grpc-infer-allocation-pool-size", Option::ArgInt, - "The maximum number of inference request/response objects that remain " + "The maximum number of states (inference request/response queues) that remain " "allocated for reuse. As long as the number of in-flight requests " "doesn't exceed this value there will be no allocation/deallocation of " "request/response objects."}); @@ -541,7 +541,7 @@ TritonParser::SetupOptions() {OPTION_GRPC_MAX_RESPONSE_POOL_SIZE, "grpc-max-response-pool-size", Option::ArgInt, "The maximum number of inference response objects that can remain " - "allocated in the pool at any given time."}); + "allocated in the response queue at any given time."}); grpc_options_.push_back( {OPTION_GRPC_USE_SSL, "grpc-use-ssl", Option::ArgBool, "Use SSL authentication for GRPC requests. Default is false."}); From 5cfab023c69c314ddf69bc69cd9506d7c0aa15a9 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 18 Feb 2025 14:33:37 +0530 Subject: [PATCH 14/17] Update --- docs/customization_guide/deploy.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/customization_guide/deploy.md b/docs/customization_guide/deploy.md index 85fb6db2f7..36b048da35 100644 --- a/docs/customization_guide/deploy.md +++ b/docs/customization_guide/deploy.md @@ -1,5 +1,5 @@