-
Notifications
You must be signed in to change notification settings - Fork 4
refactor: otaproxy: implement resource limit for requests handling and cache r/w, refactor cache_streaming with r/w thread pools #575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
for more information, see https://pre-commit.ci
…eption to indicate the client retries again
for more information, see https://pre-commit.ci
otaclient v3.7.1, single client:
With PR, 5 clients:
The memory directly used by otaproxy is very stable during multi client downloading test, and the reclaiming of file page cache can also be observed. |
@@ -472,7 +470,7 @@ def resources_count(self) -> int: | |||
) | |||
|
|||
try: | |||
_query = _orm.orm_execute(_sql_stmt) | |||
_query = _orm.orm_execute(_sql_stmt, row_factory=sqlite3.Row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, ORM will apply a custom row_factory to try to convert the raw result into cache db entry, but here we do SELECT count(*)
here, the result is not an entry of db.
Although the custom row_factory will detect whether the raw result if actually an entry or not, if not, return the result as it, but it is better to use sqlite3.Row
as row_factory if we know we are not selecting db entry in the first place.
anyio.run( | ||
_server.serve, | ||
backend="asyncio", | ||
backend_options={"loop_factory": uvloop.new_event_loop}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The anyio recommended way to setup uvloop.
# suppress logging from third-party deps | ||
logging.basicConfig(level=logging.CRITICAL) | ||
logger.setLevel(logging.INFO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For standalone starting otaproxy, configure logging. Note that here we filter out third-party deps' logging, and set the ota_proxy
logger level to INFO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup and refactor of cache header parsing/exporting. test_cache_control_headers.py
ensures that the new implementation still has the same behavior as previous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The major changes introduced by this PR.
A new cache handling model is implemented:
- Cache write worker threads pool and cache read worker threads are introduced, all IO operations are dispatched to the thread pool.
- For each worker threads pool, a pending task limitation is applied, exceeded incoming cache requests will be dropped to avoid unbounded memory usage.
The following behaviors are still kept in the new implementation:
- For the cache teeing from remote resource downloading, we will still first ensure the data streaming back to the client, even the cache writing failed.
- Cache writing and cache db entries commit is separated from data streaming back to the client, i.e., it will be handled in worker thread and will not interfere the client request handling.
_configured_con_factory = partial(_con_factory, db_f, thread_wait_timeout) | ||
self._async_db = AsyncCacheMetaORM( | ||
table_name=table_name, | ||
con_factory=_con_factory, | ||
number_of_cons=thread_nums, | ||
con_factory=_configured_con_factory, | ||
number_of_cons=read_db_thread_nums, | ||
row_factory="table_spec_no_validation", | ||
) | ||
self._db = CacheMetaORMPool( | ||
table_name=table_name, | ||
con_factory=_configured_con_factory, | ||
number_of_cons=write_db_thread_nums, | ||
row_factory="table_spec_no_validation", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here use two ORM, one for cache db entry committing(sync_db), one for looking up cache entry(async_db).
sync ORM works with thread pool, while async ORM works in the main event loop.
@asynccontextmanager | ||
async def _error_handling_for_cache_retrieving(self, url: str, send): | ||
_is_succeeded = asyncio.Event() | ||
_common_err_msg = f"request for {url=} failed" | ||
async def _error_handling_for_cache_retrieving( | ||
self, exc: Exception, url: str, send | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't implement exception handler as context manager anymore, but just implement it as a normal function.
src/ota_proxy/server_app.py
Outdated
max_concurrent_requests: int = cfg.MAX_CONCURRENT_REQUESTS, | ||
): | ||
self._lock = asyncio.Lock() | ||
self._closed = True | ||
self._ota_cache = ota_cache | ||
|
||
self._se = asyncio.Semaphore(max_concurrent_requests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For server APP, also introduce semaphore for limiting concurrent requests.
if self._se.locked(): | ||
burst_suppressed_logger.warning( | ||
f"exceed max pending requests: {self.max_concurrent_requests}, respond with 429" | ||
) | ||
await self._respond_with_error(HTTPStatus.TOO_MANY_REQUESTS, "", send) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exceeded incoming requests, return 429.
def __init__( | ||
self, | ||
ota_cache: OTACache, | ||
*, | ||
max_concurrent_requests: int = cfg.MAX_CONCURRENT_REQUESTS, | ||
): | ||
self._lock = asyncio.Lock() | ||
self._closed = True | ||
self._ota_cache = ota_cache | ||
|
||
self.max_concurrent_requests = max_concurrent_requests | ||
self._se = asyncio.Semaphore(max_concurrent_requests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On server_app side, also introduce semaphore to restrict the number of ongoing handled requests.
"""The cache blob storage is located at <cache_mnt_point>/data.""" | ||
|
||
# ------ task management ------ # | ||
MAX_CONCURRENT_REQUESTS = 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exceeding requests, HTTP error 429 will be returned.
# helper methods | ||
|
||
|
||
def parse_raw_headers(raw_headers: List[Tuple[bytes, bytes]]) -> Dict[str, str]: | ||
def parse_raw_headers(raw_headers: list[tuple[bytes, bytes]]) -> CIMultiDict[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use case-insensitive dict from the very beginning of parsing headers to prevent accidentally dropping any headers.
if isinstance(exc, (ReaderPoolBusy, CacheProviderNotReady)): | ||
_err_msg = f"{_common_err_msg} due to otaproxy is busy: {exc!r}" | ||
burst_suppressed_logger.error(_err_msg) | ||
await self._respond_with_error( | ||
HTTPStatus.SERVICE_UNAVAILABLE, "otaproxy internal busy", send | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return 504 when otacache internal r/w thread pools are busy.
async def read_file(fpath: PathLike) -> AsyncIterator[bytes]: | ||
async def read_file( | ||
fpath: PathLike, chunk_size: int = cfg.LOCAL_READ_SIZE | ||
) -> AsyncGenerator[bytes]: | ||
"""Open and read a file asynchronously.""" | ||
async with await open_file(fpath, "rb") as f: | ||
while data := await f.read(cfg.CHUNK_SIZE): | ||
fd = f.wrapped.fileno() | ||
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL) | ||
while data := await f.read(chunk_size): | ||
yield data | ||
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) | ||
|
||
|
||
def read_file_once(fpath: StrOrPath | anyio.Path) -> bytes: | ||
with open(fpath, "rb") as f: | ||
fd = f.fileno() | ||
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL) | ||
data = f.read() | ||
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) | ||
return data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use os.posix_fadvice
to prevent kernel holds the open/written files page caches after files are closed.
|
Introduction
This PR refactors the caching read/write implementation(cache_streaming.py) with considering IO operations offloaded and limiting resource and memory use. Also, limitation of concurrent requests handling is applied to the request handling entry.
Major changes:
Other changes:
Ticket
https://tier4.atlassian.net/browse/RT4-18003