Skip to content

refactor: otaproxy: implement resource limit for requests handling and cache r/w, refactor cache_streaming with r/w thread pools #575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

Bodong-Yang
Copy link
Member

@Bodong-Yang Bodong-Yang commented Jun 22, 2025

Introduction

This PR refactors the caching read/write implementation(cache_streaming.py) with considering IO operations offloaded and limiting resource and memory use. Also, limitation of concurrent requests handling is applied to the request handling entry.

Major changes:

  1. server_app: implement limiting the concurrent requests.
  2. cache_streaming: introduce read/write workers thread pool for cache r/w operations, refine cache writing, cache streaming and cache reading.

Other changes:

  1. db & lru_cache_helper: update according to cache_streaming implementation change.
  2. cache_control_header: refine implementation.

Ticket

https://tier4.atlassian.net/browse/RT4-18003

@Bodong-Yang Bodong-Yang added refactor Rewrite/remove related code instead of patching them need_backport: v3.9 labels Jun 22, 2025
@Bodong-Yang
Copy link
Member Author

Bodong-Yang commented Jun 23, 2025

otaclient v3.7.1, single client:

● otaproxy.service - OTA Client
     Loaded: loaded (/etc/systemd/system/otaproxy.service; disabled; vendor preset: enabled)
     Active: active (running) since Wed 2025-06-18 12:11:21 JST; 1min 28s ago
   Main PID: 14816 (python3)
      Tasks: 9 (limit: 9457)
     Memory: 1.3G
        CPU: 48.543s
     CGroup: /system.slice/otaproxy.service
             └─14816 /opt/ota/client/venv/bin/python3 -m otaclient.ota_proxy --host 0.0.0.0 --port 8888 --enable-cache --enable-https
root@autoware:/opt/ota/client# cat /sys/fs/cgroup/system.slice/otaproxy.service/memory.stat 
anon 39096320
file 1678356480
...

With PR, 5 clients:

● otaproxy.service - OTA proxy
     Loaded: loaded (/etc/systemd/system/otaproxy.service; static)
     Active: active (running) since Mon 2025-06-23 09:32:42 JST; 5min ago
   Main PID: 10431 (python3)
      Tasks: 25 (limit: 2288)
     Memory: 252.9M
        CPU: 4min 56.051s
     CGroup: /system.slice/otaproxy.service
             └─10431 /opt/ota/client2/venv/bin/python3 -m ota_proxy --host 0.0.0.0 --port 8888 --enable-cache --enable-https
root@autoware:/home/autoware# cat /sys/fs/cgroup/system.slice/otaproxy.service/memory.stat 
anon 77803520 # 77MB
file 57675776 # 58MB
...

The memory directly used by otaproxy is very stable during multi client downloading test, and the reclaiming of file page cache can also be observed.

@@ -472,7 +470,7 @@ def resources_count(self) -> int:
)

try:
_query = _orm.orm_execute(_sql_stmt)
_query = _orm.orm_execute(_sql_stmt, row_factory=sqlite3.Row)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, ORM will apply a custom row_factory to try to convert the raw result into cache db entry, but here we do SELECT count(*) here, the result is not an entry of db.
Although the custom row_factory will detect whether the raw result if actually an entry or not, if not, return the result as it, but it is better to use sqlite3.Row as row_factory if we know we are not selecting db entry in the first place.

anyio.run(
_server.serve,
backend="asyncio",
backend_options={"loop_factory": uvloop.new_event_loop},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The anyio recommended way to setup uvloop.

Comment on lines +77 to +79
# suppress logging from third-party deps
logging.basicConfig(level=logging.CRITICAL)
logger.setLevel(logging.INFO)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For standalone starting otaproxy, configure logging. Note that here we filter out third-party deps' logging, and set the ota_proxy logger level to INFO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup and refactor of cache header parsing/exporting. test_cache_control_headers.py ensures that the new implementation still has the same behavior as previous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major changes introduced by this PR.

A new cache handling model is implemented:

  1. Cache write worker threads pool and cache read worker threads are introduced, all IO operations are dispatched to the thread pool.
  2. For each worker threads pool, a pending task limitation is applied, exceeded incoming cache requests will be dropped to avoid unbounded memory usage.

The following behaviors are still kept in the new implementation:

  1. For the cache teeing from remote resource downloading, we will still first ensure the data streaming back to the client, even the cache writing failed.
  2. Cache writing and cache db entries commit is separated from data streaming back to the client, i.e., it will be handled in worker thread and will not interfere the client request handling.

Comment on lines +67 to 79
_configured_con_factory = partial(_con_factory, db_f, thread_wait_timeout)
self._async_db = AsyncCacheMetaORM(
table_name=table_name,
con_factory=_con_factory,
number_of_cons=thread_nums,
con_factory=_configured_con_factory,
number_of_cons=read_db_thread_nums,
row_factory="table_spec_no_validation",
)
self._db = CacheMetaORMPool(
table_name=table_name,
con_factory=_configured_con_factory,
number_of_cons=write_db_thread_nums,
row_factory="table_spec_no_validation",
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here use two ORM, one for cache db entry committing(sync_db), one for looking up cache entry(async_db).

sync ORM works with thread pool, while async ORM works in the main event loop.

Comment on lines -206 to +221
@asynccontextmanager
async def _error_handling_for_cache_retrieving(self, url: str, send):
_is_succeeded = asyncio.Event()
_common_err_msg = f"request for {url=} failed"
async def _error_handling_for_cache_retrieving(
self, exc: Exception, url: str, send
) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't implement exception handler as context manager anymore, but just implement it as a normal function.

Comment on lines 144 to 150
max_concurrent_requests: int = cfg.MAX_CONCURRENT_REQUESTS,
):
self._lock = asyncio.Lock()
self._closed = True
self._ota_cache = ota_cache

self._se = asyncio.Semaphore(max_concurrent_requests)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For server APP, also introduce semaphore for limiting concurrent requests.

Comment on lines +353 to +358
if self._se.locked():
burst_suppressed_logger.warning(
f"exceed max pending requests: {self.max_concurrent_requests}, respond with 429"
)
await self._respond_with_error(HTTPStatus.TOO_MANY_REQUESTS, "", send)
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exceeded incoming requests, return 429.

Comment on lines +140 to +151
def __init__(
self,
ota_cache: OTACache,
*,
max_concurrent_requests: int = cfg.MAX_CONCURRENT_REQUESTS,
):
self._lock = asyncio.Lock()
self._closed = True
self._ota_cache = ota_cache

self.max_concurrent_requests = max_concurrent_requests
self._se = asyncio.Semaphore(max_concurrent_requests)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On server_app side, also introduce semaphore to restrict the number of ongoing handled requests.

"""The cache blob storage is located at <cache_mnt_point>/data."""

# ------ task management ------ #
MAX_CONCURRENT_REQUESTS = 1024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exceeding requests, HTTP error 429 will be returned.

# helper methods


def parse_raw_headers(raw_headers: List[Tuple[bytes, bytes]]) -> Dict[str, str]:
def parse_raw_headers(raw_headers: list[tuple[bytes, bytes]]) -> CIMultiDict[str]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use case-insensitive dict from the very beginning of parsing headers to prevent accidentally dropping any headers.

Comment on lines +225 to +230
if isinstance(exc, (ReaderPoolBusy, CacheProviderNotReady)):
_err_msg = f"{_common_err_msg} due to otaproxy is busy: {exc!r}"
burst_suppressed_logger.error(_err_msg)
await self._respond_with_error(
HTTPStatus.SERVICE_UNAVAILABLE, "otaproxy internal busy", send
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return 504 when otacache internal r/w thread pools are busy.

Comment on lines -12 to +35
async def read_file(fpath: PathLike) -> AsyncIterator[bytes]:
async def read_file(
fpath: PathLike, chunk_size: int = cfg.LOCAL_READ_SIZE
) -> AsyncGenerator[bytes]:
"""Open and read a file asynchronously."""
async with await open_file(fpath, "rb") as f:
while data := await f.read(cfg.CHUNK_SIZE):
fd = f.wrapped.fileno()
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL)
while data := await f.read(chunk_size):
yield data
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED)


def read_file_once(fpath: StrOrPath | anyio.Path) -> bytes:
with open(fpath, "rb") as f:
fd = f.fileno()
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL)
data = f.read()
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED)
return data
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use os.posix_fadvice to prevent kernel holds the open/written files page caches after files are closed.

@Bodong-Yang Bodong-Yang changed the title refactor: otaproxy: implement resource limit for requests handling, introduce read/write thread pools for IO operations refactor: otaproxy: implement resource limit for requests handling and cache r/w, refactor cache_streaming with r/w thread pools Jun 25, 2025
@Bodong-Yang Bodong-Yang marked this pull request as ready for review June 25, 2025 06:24
@Bodong-Yang Bodong-Yang requested a review from a team as a code owner June 25, 2025 06:24
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need_backport: v3.9 refactor Rewrite/remove related code instead of patching them
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant