From 3575e0b21be8e00d70ce1e1d6a65366ef6f7e44c Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Mon, 7 Apr 2025 17:12:23 +0800 Subject: [PATCH 1/8] add timestamp as part of cache file name --- ibis-server/app/query_cache/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 6a993f852..061d62fc0 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -1,4 +1,5 @@ import hashlib +import time from typing import Any, Optional import ibis @@ -64,8 +65,16 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str: return hashlib.sha256(key_string.encode()).hexdigest() - def _get_cache_file_name(self, cache_key: str) -> str: - return f"{cache_key}.cache" + def _get_cache_file_name(self, cache_key: str) -> str | None: + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + return file.path + + # If no cache file found, create a new one + # Get unix timestamp without decimal part (truncates milliseconds) + cache_create_timestamp = int(time.time()) + return f"{cache_key}-{cache_create_timestamp}.cache" def _get_full_path(self, path: str) -> str: return self.root + path From 3fb3c97ffdfc90401416f2bf6ad7b62c16db002b Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Mon, 7 Apr 2025 17:46:29 +0800 Subject: [PATCH 2/8] when cache hit , also return the create at timestamp in header --- ibis-server/app/query_cache/__init__.py | 15 +++++++++++++++ ibis-server/app/routers/v2/connector.py | 7 ++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 061d62fc0..3d88c4d7e 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -53,6 +53,18 @@ def set(self, data_source: str, sql: str, result: Any, info) -> None: logger.debug(f"Failed to write query cache: {e}") return + def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int: + cache_key = self._generate_cache_key(data_source, sql, info) + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + # xxxxxxxxxxxxxx-1744016574.cache + # we only care about the timestamp part + timestamp = int(file.path.split("-")[-1].split(".")[0]) + return timestamp + + return None + def _generate_cache_key(self, data_source: str, sql: str, info) -> str: key_parts = [ data_source, @@ -72,6 +84,9 @@ def _get_cache_file_name(self, cache_key: str) -> str | None: return file.path # If no cache file found, create a new one + # | hash | | timestamp | + # 0dba4b451b77e62de2e6dfa78579a40-1744016574.cache + # Get unix timestamp without decimal part (truncates milliseconds) cache_create_timestamp = int(time.time()) return f"{cache_key}-{cache_create_timestamp}.cache" diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index 8e931b775..5cc9b2c77 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -84,7 +84,7 @@ async def query( if cache_enable: cached_result = query_cache_manager.get( - str(data_source), dto.sql, dto.connection_info + data_source, dto.sql, dto.connection_info ) cache_hit = cached_result is not None @@ -92,6 +92,11 @@ async def query( span.add_event("cache hit") response = ORJSONResponse(to_json(cached_result)) response.headers["X-Cache-Hit"] = str(cache_hit).lower() + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) return response else: result = connector.query(rewritten_sql, limit=limit) From a16673d1ad0504784f26f54719782c9d4c93555b Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 09:10:06 +0800 Subject: [PATCH 3/8] add create timestamp --- ibis-server/app/routers/v3/connector.py | 7 ++++++- ibis-server/tests/routers/v2/connector/test_postgres.py | 2 ++ .../tests/routers/v3/connector/postgres/test_query.py | 2 ++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index c1934c381..65493f145 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -69,7 +69,7 @@ async def query( if cache_enable: cached_result = query_cache_manager.get( - str(data_source), dto.sql, dto.connection_info + data_source, dto.sql, dto.connection_info ) cache_hit = cached_result is not None @@ -77,6 +77,11 @@ async def query( span.add_event("cache hit") response = ORJSONResponse(to_json(cached_result)) response.headers["X-Cache-Hit"] = str(cache_hit).lower() + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) return response else: result = connector.query(rewritten_sql, limit=limit) diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index 6c2b24546..1c67797ee 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -179,6 +179,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine assert response1.status_code == 200 assert response1.headers["X-Cache-Hit"] == "false" + assert response1.headers["X-Cache-Create-At"] is None result1 = response1.json() # Second request with same SQL - should hit cache @@ -192,6 +193,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine ) assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() # Verify results are identical diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_query.py b/ibis-server/tests/routers/v3/connector/postgres/test_query.py index dd7afd888..6de57c933 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_query.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_query.py @@ -149,6 +149,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response1.status_code == 200 assert response1.headers["X-Cache-Hit"] == "false" + assert response1.headers["X-Cache-Create-At"] is None result1 = response1.json() # Second request with same SQL - should hit cache @@ -163,6 +164,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() assert result1["data"] == result2["data"] From 65e4473f3547d1fc40b3fe7aa890214b7d9596eb Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 09:35:32 +0800 Subject: [PATCH 4/8] fix test --- ibis-server/tests/routers/v2/connector/test_postgres.py | 1 - ibis-server/tests/routers/v3/connector/postgres/test_query.py | 1 - 2 files changed, 2 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index 1c67797ee..f9760c763 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -179,7 +179,6 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine assert response1.status_code == 200 assert response1.headers["X-Cache-Hit"] == "false" - assert response1.headers["X-Cache-Create-At"] is None result1 = response1.json() # Second request with same SQL - should hit cache diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_query.py b/ibis-server/tests/routers/v3/connector/postgres/test_query.py index 6de57c933..8423cc3ba 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_query.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_query.py @@ -149,7 +149,6 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response1.status_code == 200 assert response1.headers["X-Cache-Hit"] == "false" - assert response1.headers["X-Cache-Create-At"] is None result1 = response1.json() # Second request with same SQL - should hit cache From 169000a60df1499868a3aeaf13f2803b9df3d069 Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 09:53:21 +0800 Subject: [PATCH 5/8] override test --- ibis-server/app/routers/v2/connector.py | 23 +++++++++++++-- ibis-server/app/routers/v3/connector.py | 2 ++ .../routers/v2/connector/test_postgres.py | 29 +++++++++++++++++++ 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index 5cc9b2c77..ff8ac6654 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -43,6 +43,7 @@ async def query( dto: QueryDTO, dry_run: Annotated[bool, Query(alias="dryRun")] = False, cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False, + override_cache: Annotated[bool, Query(alias="overrideCache")] = False, limit: int | None = None, java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector), query_cache_manager: QueryCacheManager = Depends(get_query_cache_manager), @@ -88,7 +89,7 @@ async def query( ) cache_hit = cached_result is not None - if cache_hit: + if cache_hit and not override_cache: span.add_event("cache hit") response = ORJSONResponse(to_json(cached_result)) response.headers["X-Cache-Hit"] = str(cache_hit).lower() @@ -100,13 +101,29 @@ async def query( return response else: result = connector.query(rewritten_sql, limit=limit) + + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = str(cache_hit).lower() + if cache_hit: + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + if cache_enable: query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() + if override_cache: + response.headers["X-cache-override"] = str(override_cache).lower() + response.headers["X-cache-override-at"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + return response diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index 65493f145..c7553d300 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -38,6 +38,7 @@ async def query( dto: QueryDTO, dry_run: Annotated[bool, Query(alias="dryRun")] = False, cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False, + override_cache: Annotated[bool, Query(alias="overrideCache")] = False, limit: int | None = None, headers: Annotated[str | None, Header()] = None, java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector), @@ -102,6 +103,7 @@ async def query( dto, dry_run, cache_enable, + override_cache, limit, java_engine_connector, query_cache_manager, diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index f9760c763..9651764e2 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -201,6 +201,35 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override( + client, manifest_str, postgres: PostgresContainer +): + connection_info = _to_connection_info(postgres) + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": 'SELECT * FROM "Orders" LIMIT 10', + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": 'SELECT * FROM "Orders" LIMIT 10', + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > 1743984000 # 2025.04.07 + + async def test_query_with_connection_url( client, manifest_str, postgres: PostgresContainer ): From 7f8c425c6fee81e2c09b9f7a86ba9cc57eabafef Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 16:17:31 +0800 Subject: [PATCH 6/8] add v3 test --- ibis-server/app/query_cache/__init__.py | 20 ++++++++----- ibis-server/app/routers/v3/connector.py | 22 +++++++++++++-- .../routers/v2/connector/test_postgres.py | 6 ++-- .../v3/connector/postgres/test_query.py | 28 +++++++++++++++++++ 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 3d88c4d7e..59a7a6a2f 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -38,7 +38,7 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]: @tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL) def set(self, data_source: str, sql: str, result: Any, info) -> None: cache_key = self._generate_cache_key(data_source, sql, info) - cache_file_name = self._get_cache_file_name(cache_key) + cache_file_name = self._set_cache_file_name(cache_key) op = self._get_dal_operator() full_path = self._get_full_path(cache_file_name) @@ -77,18 +77,24 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str: return hashlib.sha256(key_string.encode()).hexdigest() - def _get_cache_file_name(self, cache_key: str) -> str | None: + def _get_cache_file_name(self, cache_key: str) -> str: op = self._get_dal_operator() for file in op.list("/"): if file.path.startswith(cache_key): return file.path - # If no cache file found, create a new one - # | hash | | timestamp | - # 0dba4b451b77e62de2e6dfa78579a40-1744016574.cache + cache_create_timestamp = int(time.time() * 1000) + return f"{cache_key}-{cache_create_timestamp}.cache" + + def _set_cache_file_name(self, cache_key: str) -> str: + # Delete old cache files, make only one cache file per query + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + logger.info(f"Deleting old cache file {file.path}") + op.delete(file.path) - # Get unix timestamp without decimal part (truncates milliseconds) - cache_create_timestamp = int(time.time()) + cache_create_timestamp = int(time.time() * 1000) return f"{cache_key}-{cache_create_timestamp}.cache" def _get_full_path(self, path: str) -> str: diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index c7553d300..bc3f8a94a 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -74,7 +74,7 @@ async def query( ) cache_hit = cached_result is not None - if cache_hit: + if cache_hit and not override_cache: span.add_event("cache hit") response = ORJSONResponse(to_json(cached_result)) response.headers["X-Cache-Hit"] = str(cache_hit).lower() @@ -86,12 +86,28 @@ async def query( return response else: result = connector.query(rewritten_sql, limit=limit) + + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = str(cache_hit).lower() + if cache_hit: + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + if cache_enable: query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() + + if override_cache: + response.headers["X-cache-override"] = str(override_cache).lower() + response.headers["X-cache-override-at"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) return response except Exception as e: logger.warning( diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index 9651764e2..014862dce 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -216,7 +216,7 @@ async def test_query_with_cache_override( ) assert response1.status_code == 200 - # Second request with same SQL - should hit cache + # Second request with same SQL - should hit cache and override it response2 = await client.post( url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Enable cache json={ @@ -227,7 +227,9 @@ async def test_query_with_cache_override( ) assert response2.status_code == 200 assert response2.headers["X-Cache-Override"] == "true" - assert int(response2.headers["X-Cache-Override-At"]) > 1743984000 # 2025.04.07 + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) async def test_query_with_connection_url( diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_query.py b/ibis-server/tests/routers/v3/connector/postgres/test_query.py index 8423cc3ba..c38c63401 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_query.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_query.py @@ -171,6 +171,34 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override(client, manifest_str, connection_info): + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT * FROM wren.public.orders LIMIT 1", + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache and override it + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT * FROM wren.public.orders LIMIT 1", + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) + + async def test_query_with_connection_url(client, manifest_str, connection_url): response = await client.post( url=f"{base_url}/query", From 1e824434602719c86f608fdcc849ab0ec71cb559 Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 18:14:48 +0800 Subject: [PATCH 7/8] refactor cache condition code --- ibis-server/app/routers/v2/connector.py | 57 ++++++++++++++---------- ibis-server/app/routers/v3/connector.py | 58 +++++++++++++++---------- 2 files changed, 71 insertions(+), 44 deletions(-) diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index ff8ac6654..339f344e0 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -89,42 +89,55 @@ async def query( ) cache_hit = cached_result is not None - if cache_hit and not override_cache: - span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - response.headers["X-Cache-Create-At"] = str( - query_cache_manager.get_cache_file_timestamp( - data_source, dto.sql, dto.connection_info + match (cache_enable, cache_hit, override_cache): + # case 1 cache hit read + case (True, True, False): + span.add_event("cache hit") + response = ORJSONResponse(to_json(cached_result)) + response.headers["X-Cache-Hit"] = "true" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) ) - ) - return response - else: - result = connector.query(rewritten_sql, limit=limit) - - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - if cache_hit: + # case 2 cache hit but override cache + case (True, True, True): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + # because we override the cache, so we need to set the cache hit to false + response.headers["X-Cache-Hit"] = "false" response.headers["X-Cache-Create-At"] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) ) - - if cache_enable: query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) - - if override_cache: - response.headers["X-cache-override"] = str(override_cache).lower() - response.headers["X-cache-override-at"] = str( + response.headers["X-Cache-Override"] = "true" + response.headers["X-Cache-Override-At"] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) ) + # case 3 and case 4 cache miss read (first time cache read need to create cache) + # no matter the cache override or not, we need to create cache + case (True, False, _): + result = connector.query(rewritten_sql, limit=limit) - return response + # set cache + query_cache_manager.set( + data_source, dto.sql, result, dto.connection_info + ) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + # case 5~8 Other cases (cache is not enabled) + case (False, _, _): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + + return response @router.post("/{data_source}/validate/{rule_name}", deprecated=True) diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index bc3f8a94a..5847ad9f2 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -74,41 +74,55 @@ async def query( ) cache_hit = cached_result is not None - if cache_hit and not override_cache: - span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - response.headers["X-Cache-Create-At"] = str( - query_cache_manager.get_cache_file_timestamp( - data_source, dto.sql, dto.connection_info + match (cache_enable, cache_hit, override_cache): + # case 1 cache hit read + case (True, True, False): + span.add_event("cache hit") + response = ORJSONResponse(to_json(cached_result)) + response.headers["X-Cache-Hit"] = "true" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) ) - ) - return response - else: - result = connector.query(rewritten_sql, limit=limit) - - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - if cache_hit: + # case 2 cache hit but override cache + case (True, True, True): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + # because we override the cache, so we need to set the cache hit to false + response.headers["X-Cache-Hit"] = "false" response.headers["X-Cache-Create-At"] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) ) - - if cache_enable: query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) - - if override_cache: - response.headers["X-cache-override"] = str(override_cache).lower() - response.headers["X-cache-override-at"] = str( + response.headers["X-Cache-Override"] = "true" + response.headers["X-Cache-Override-At"] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) ) - return response + # case 3 and case 4 cache miss read (first time cache read need to create cache) + # no matter the cache override or not, we need to create cache + case (True, False, _): + result = connector.query(rewritten_sql, limit=limit) + + # set cache + query_cache_manager.set( + data_source, dto.sql, result, dto.connection_info + ) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + # case 5~8 Other cases (cache is not enabled) + case (False, _, _): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + + return response except Exception as e: logger.warning( "Failed to execute v3 query, fallback to v2: {}\n" + MIGRATION_MESSAGE, From 2decaf4f1259aadfb878e08d9115f2acd294cd53 Mon Sep 17 00:00:00 2001 From: DouEnergy Date: Tue, 8 Apr 2025 18:26:10 +0800 Subject: [PATCH 8/8] add fallback test --- ibis-server/app/query_cache/__init__.py | 12 +++++--- .../v3/connector/postgres/test_fallback_v2.py | 29 +++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 59a7a6a2f..fc04260b4 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -53,16 +53,20 @@ def set(self, data_source: str, sql: str, result: Any, info) -> None: logger.debug(f"Failed to write query cache: {e}") return - def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int: + def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int | None: cache_key = self._generate_cache_key(data_source, sql, info) op = self._get_dal_operator() for file in op.list("/"): if file.path.startswith(cache_key): # xxxxxxxxxxxxxx-1744016574.cache # we only care about the timestamp part - timestamp = int(file.path.split("-")[-1].split(".")[0]) - return timestamp - + try: + timestamp = int(file.path.split("-")[-1].split(".")[0]) + return timestamp + except (IndexError, ValueError) as e: + logger.debug( + f"Failed to extract timestamp from cache file {file.path}: {e}" + ) return None def _generate_cache_key(self, data_source: str, sql: str, info) -> str: diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py b/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py index 883a382b6..aa68593e5 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py @@ -71,6 +71,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() # Verify results are identical @@ -79,6 +80,34 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override(client, manifest_str, connection_info): + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT orderkey FROM orders LIMIT 1", + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache and override it + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT orderkey FROM orders LIMIT 1", + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) + + async def test_dry_run(client, manifest_str, connection_info): response = await client.post( url=f"{base_url}/query",