Skip to content

Commit a644fc4

Browse files
authored
refactor(backport v3.8.x): ota_proxy: refine request handling flow (#443)
This PR simplifies the request handling flow, instead of implementing the guard conditions checks in retrieve_file API, now the guard conditions checks are implemented within each _retrieve_file handlers.
1 parent 23e98b4 commit a644fc4

File tree

1 file changed

+102
-79
lines changed

1 file changed

+102
-79
lines changed

src/ota_proxy/ota_cache.py

Lines changed: 102 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import time
2323
from concurrent.futures import ThreadPoolExecutor
2424
from pathlib import Path
25-
from typing import AsyncIterator, Dict, List, Mapping, Optional, Tuple
25+
from typing import AsyncIterator, Mapping, Optional
2626
from urllib.parse import SplitResult, quote, urlsplit
2727

2828
import aiohttp
@@ -276,7 +276,7 @@ def _background_check_free_space(self):
276276
)
277277
time.sleep(cfg.DISK_USE_PULL_INTERVAL)
278278

279-
def _cache_entries_cleanup(self, entry_hashes: List[str]):
279+
def _cache_entries_cleanup(self, entry_hashes: list[str]) -> None:
280280
"""Cleanup entries indicated by entry_hashes list."""
281281
for entry_hash in entry_hashes:
282282
# remove cache entry
@@ -368,7 +368,7 @@ async def _retrieve_file_by_downloading(
368368
raw_url: str,
369369
*,
370370
headers: Mapping[str, str],
371-
) -> Tuple[AsyncIterator[bytes], CIMultiDictProxy[str]]:
371+
) -> tuple[AsyncIterator[bytes], CIMultiDictProxy[str]]:
372372
async def _do_request() -> AsyncIterator[bytes]:
373373
async with self._session.get(
374374
self._process_raw_url(raw_url),
@@ -392,28 +392,33 @@ async def _do_request() -> AsyncIterator[bytes]:
392392
resp_headers: CIMultiDictProxy[str] = await (_remote_fd := _do_request()).__anext__() # type: ignore
393393
return _remote_fd, resp_headers
394394

395-
async def _retrieve_file_by_cache(
396-
self, cache_identifier: str, *, retry_cache: bool
395+
async def _retrieve_file_by_cache_lookup(
396+
self, *, raw_url: str, cache_policy: OTAFileCacheControl
397397
) -> tuple[AsyncIterator[bytes], CIMultiDict[str]] | None:
398398
"""
399399
Returns:
400400
A tuple of bytes iterator and headers dict for back to client.
401401
"""
402-
# cache file available, lookup the db for metadata
402+
if (
403+
not self._cache_enabled
404+
or cache_policy.no_cache
405+
or cache_policy.retry_caching
406+
):
407+
return
408+
409+
cache_identifier = cache_policy.file_sha256
410+
if not cache_identifier:
411+
# fallback to use URL based hash, and clear compression_alg for such case
412+
cache_identifier = url_based_hash(raw_url)
413+
403414
meta_db_entry = await self._lru_helper.lookup_entry(cache_identifier)
404415
if not meta_db_entry:
405416
return
406417

407418
# NOTE: db_entry.file_sha256 can be either
408419
# 1. valid sha256 value for corresponding plain uncompressed OTA file
409420
# 2. URL based sha256 value for corresponding requested URL
410-
# otaclient indicates that this cache entry is invalid, cleanup and exit
411421
cache_file = self._base_dir / cache_identifier
412-
if retry_cache:
413-
logger.debug(f"requested with retry_cache: {meta_db_entry=}..")
414-
await self._lru_helper.remove_entry(cache_identifier)
415-
cache_file.unlink(missing_ok=True)
416-
return
417422

418423
# check if cache file exists
419424
# NOTE(20240729): there is an edge condition that the finished cached file is not yet renamed,
@@ -423,7 +428,6 @@ async def _retrieve_file_by_cache(
423428
for _retry_count in range(_retry_count_max):
424429
if cache_file.is_file():
425430
break
426-
427431
await asyncio.sleep(get_backoff(_retry_count, _factor, _backoff_max))
428432

429433
if not cache_file.is_file():
@@ -445,7 +449,12 @@ async def _retrieve_file_by_external_cache(
445449
self, client_cache_policy: OTAFileCacheControl
446450
) -> tuple[AsyncIterator[bytes], CIMultiDict[str]] | None:
447451
# skip if not external cache or otaclient doesn't sent valid file_sha256
448-
if not self._external_cache or not client_cache_policy.file_sha256:
452+
if (
453+
not self._external_cache
454+
or client_cache_policy.no_cache
455+
or client_cache_policy.retry_caching
456+
or not client_cache_policy.file_sha256
457+
):
449458
return
450459

451460
cache_identifier = client_cache_policy.file_sha256
@@ -473,83 +482,34 @@ async def _retrieve_file_by_external_cache(
473482
)
474483
return read_file(cache_file, executor=self._executor), _header
475484

476-
# exposed API
477-
478-
async def retrieve_file(
485+
async def _retrieve_file_by_new_caching(
479486
self,
487+
*,
480488
raw_url: str,
481-
headers_from_client: Dict[str, str],
482-
) -> tuple[AsyncIterator[bytes], CIMultiDict[str] | CIMultiDictProxy[str]] | None:
483-
"""Retrieve a file descriptor for the requested <raw_url>.
484-
485-
This method retrieves a file descriptor for incoming client request.
486-
Upper uvicorn app can use this file descriptor to yield chunks of data,
487-
and send chunks to the on-calling ota_client.
488-
489-
NOTE: use raw_url in all operations, except opening remote file.
490-
491-
Args:
492-
raw_url: unquoted raw url received from uvicorn
493-
headers_from_client: headers come from client's request, which will be
494-
passthrough to upper otaproxy and/or remote OTA image server.
495-
496-
Returns:
497-
A tuple contains an asyncio generator for upper server app to yield data chunks from
498-
and headers dict that should be sent back to client in resp.
499-
"""
500-
if self._closed:
501-
raise BaseOTACacheError("ota cache pool is closed")
502-
503-
cache_policy = OTAFileCacheControl.parse_header(
504-
headers_from_client.get(HEADER_OTA_FILE_CACHE_CONTROL, "")
505-
)
506-
if cache_policy.no_cache:
507-
logger.info(f"client indicates that do not cache for {raw_url=}")
508-
509-
if not self._upper_proxy:
510-
headers_from_client.pop(HEADER_OTA_FILE_CACHE_CONTROL, None)
511-
512-
# --- case 1: not using cache, directly download file --- #
513-
if (
514-
not self._cache_enabled # ota_proxy is configured to not cache anything
515-
or cache_policy.no_cache # ota_client send request with no_cache policy
516-
or not self._storage_below_hard_limit_event.is_set() # disable cache if space hardlimit is reached
517-
):
518-
logger.debug(
519-
f"not use cache({self._cache_enabled=}, {cache_policy=}, "
520-
f"{self._storage_below_hard_limit_event.is_set()=}): {raw_url=}"
521-
)
522-
return await self._retrieve_file_by_downloading(
523-
raw_url, headers=headers_from_client
524-
)
525-
526-
# --- case 2: if externel cache source available, try to use it --- #
527-
# NOTE: if client requsts with retry_caching directive, it may indicate cache corrupted
528-
# in external cache storage, in such case we should skip the use of external cache.
489+
cache_policy: OTAFileCacheControl,
490+
headers_from_client: dict[str, str],
491+
) -> tuple[AsyncIterator[bytes], CIMultiDictProxy[str] | CIMultiDict[str]] | None:
492+
# NOTE(20241202): no new cache on hard limit being reached
529493
if (
530-
self._external_cache
531-
and not cache_policy.retry_caching
532-
and (_res := await self._retrieve_file_by_external_cache(cache_policy))
494+
not self._cache_enabled
495+
or cache_policy.no_cache
496+
or not self._storage_below_hard_limit_event.is_set()
533497
):
534-
return _res
498+
return
535499

536-
# pre-calculated cache_identifier and corresponding compression_alg
537500
cache_identifier = cache_policy.file_sha256
538501
compression_alg = cache_policy.file_compression_alg
539-
540-
# fallback to use URL based hash, and clear compression_alg
541502
if not cache_identifier:
503+
# fallback to use URL based hash, and clear compression_alg for such case
542504
cache_identifier = url_based_hash(raw_url)
543505
compression_alg = ""
544506

545-
# --- case 3: try to use local cache --- #
546-
if _res := await self._retrieve_file_by_cache(
547-
cache_identifier, retry_cache=cache_policy.retry_caching
548-
):
549-
return _res
507+
# if set, cleanup any previous cache file before starting new cache
508+
if cache_policy.retry_caching:
509+
logger.debug(f"requested with retry_cache for {raw_url=} ...")
510+
await self._lru_helper.remove_entry(cache_identifier)
511+
(self._base_dir / cache_identifier).unlink(missing_ok=True)
550512

551-
# --- case 4: no cache available, streaming remote file and cache --- #
552-
# a online tracker is available for this requrest
553513
if (tracker := self._on_going_caching.get_tracker(cache_identifier)) and (
554514
subscription := await tracker.subscribe_tracker()
555515
):
@@ -588,3 +548,66 @@ async def retrieve_file(
588548
raise
589549
finally:
590550
tracker = None # remove ref
551+
552+
# exposed API
553+
554+
async def retrieve_file(
555+
self, raw_url: str, headers_from_client: dict[str, str]
556+
) -> tuple[AsyncIterator[bytes], CIMultiDict[str] | CIMultiDictProxy[str]] | None:
557+
"""Retrieve a file descriptor for the requested <raw_url>.
558+
559+
This method retrieves a file descriptor for incoming client request.
560+
Upper uvicorn app can use this file descriptor to yield chunks of data,
561+
and send chunks to the on-calling ota_client.
562+
563+
NOTE: use raw_url in all operations, except opening remote file.
564+
565+
Args:
566+
raw_url: unquoted raw url received from uvicorn
567+
headers_from_client: headers come from client's request, which will be
568+
passthrough to upper otaproxy and/or remote OTA image server.
569+
570+
Returns:
571+
A tuple contains an asyncio generator for upper server app to yield data chunks from
572+
and headers dict that should be sent back to client in resp.
573+
"""
574+
if self._closed:
575+
raise BaseOTACacheError("ota cache pool is closed")
576+
577+
cache_policy = OTAFileCacheControl.parse_header(
578+
headers_from_client.get(HEADER_OTA_FILE_CACHE_CONTROL, "")
579+
)
580+
if cache_policy.no_cache:
581+
logger.info(f"client indicates that do not cache for {raw_url=}")
582+
583+
# when there is no upper_proxy, do not passthrough the OTA_FILE_CACHE_CONTROL header.
584+
if not self._upper_proxy:
585+
headers_from_client.pop(HEADER_OTA_FILE_CACHE_CONTROL, None)
586+
587+
# a fastpath when cache is not enabled or client requires so
588+
if not self._cache_enabled or cache_policy.no_cache:
589+
return await self._retrieve_file_by_downloading(
590+
raw_url, headers=headers_from_client
591+
)
592+
593+
# NOTE(20241202): behavior changed: even if _cache_enabled is False, if external_cache is configured
594+
# and loaded, still try to use external cache source.
595+
if _res := await self._retrieve_file_by_external_cache(cache_policy):
596+
return _res
597+
598+
if _res := await self._retrieve_file_by_cache_lookup(
599+
raw_url=raw_url, cache_policy=cache_policy
600+
):
601+
return _res
602+
603+
if _res := await self._retrieve_file_by_new_caching(
604+
raw_url=raw_url,
605+
cache_policy=cache_policy,
606+
headers_from_client=headers_from_client,
607+
):
608+
return _res
609+
610+
# as last resort, finally try to handle the request by directly downloading
611+
return await self._retrieve_file_by_downloading(
612+
raw_url, headers=headers_from_client
613+
)

0 commit comments

Comments
 (0)