diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 6a993f852..fc04260b4 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -1,4 +1,5 @@ import hashlib +import time from typing import Any, Optional import ibis @@ -37,7 +38,7 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]: @tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL) def set(self, data_source: str, sql: str, result: Any, info) -> None: cache_key = self._generate_cache_key(data_source, sql, info) - cache_file_name = self._get_cache_file_name(cache_key) + cache_file_name = self._set_cache_file_name(cache_key) op = self._get_dal_operator() full_path = self._get_full_path(cache_file_name) @@ -52,6 +53,22 @@ def set(self, data_source: str, sql: str, result: Any, info) -> None: logger.debug(f"Failed to write query cache: {e}") return + def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int | None: + cache_key = self._generate_cache_key(data_source, sql, info) + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + # xxxxxxxxxxxxxx-1744016574.cache + # we only care about the timestamp part + try: + timestamp = int(file.path.split("-")[-1].split(".")[0]) + return timestamp + except (IndexError, ValueError) as e: + logger.debug( + f"Failed to extract timestamp from cache file {file.path}: {e}" + ) + return None + def _generate_cache_key(self, data_source: str, sql: str, info) -> str: key_parts = [ data_source, @@ -65,7 +82,24 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str: return hashlib.sha256(key_string.encode()).hexdigest() def _get_cache_file_name(self, cache_key: str) -> str: - return f"{cache_key}.cache" + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + return file.path + + cache_create_timestamp = int(time.time() * 1000) + return f"{cache_key}-{cache_create_timestamp}.cache" + + def _set_cache_file_name(self, cache_key: str) -> str: + # Delete old cache files, make only one cache file per query + op = self._get_dal_operator() + for file in op.list("/"): + if file.path.startswith(cache_key): + logger.info(f"Deleting old cache file {file.path}") + op.delete(file.path) + + cache_create_timestamp = int(time.time() * 1000) + return f"{cache_key}-{cache_create_timestamp}.cache" def _get_full_path(self, path: str) -> str: return self.root + path diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index 8e931b775..339f344e0 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -43,6 +43,7 @@ async def query( dto: QueryDTO, dry_run: Annotated[bool, Query(alias="dryRun")] = False, cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False, + override_cache: Annotated[bool, Query(alias="overrideCache")] = False, limit: int | None = None, java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector), query_cache_manager: QueryCacheManager = Depends(get_query_cache_manager), @@ -84,25 +85,59 @@ async def query( if cache_enable: cached_result = query_cache_manager.get( - str(data_source), dto.sql, dto.connection_info + data_source, dto.sql, dto.connection_info ) cache_hit = cached_result is not None - if cache_hit: - span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - return response - else: - result = connector.query(rewritten_sql, limit=limit) - if cache_enable: + match (cache_enable, cache_hit, override_cache): + # case 1 cache hit read + case (True, True, False): + span.add_event("cache hit") + response = ORJSONResponse(to_json(cached_result)) + response.headers["X-Cache-Hit"] = "true" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + # case 2 cache hit but override cache + case (True, True, True): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + # because we override the cache, so we need to set the cache hit to false + response.headers["X-Cache-Hit"] = "false" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + query_cache_manager.set( + data_source, dto.sql, result, dto.connection_info + ) + response.headers["X-Cache-Override"] = "true" + response.headers["X-Cache-Override-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + # case 3 and case 4 cache miss read (first time cache read need to create cache) + # no matter the cache override or not, we need to create cache + case (True, False, _): + result = connector.query(rewritten_sql, limit=limit) + + # set cache query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + # case 5~8 Other cases (cache is not enabled) + case (False, _, _): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - return response + return response @router.post("/{data_source}/validate/{rule_name}", deprecated=True) diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index c1934c381..5847ad9f2 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -38,6 +38,7 @@ async def query( dto: QueryDTO, dry_run: Annotated[bool, Query(alias="dryRun")] = False, cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False, + override_cache: Annotated[bool, Query(alias="overrideCache")] = False, limit: int | None = None, headers: Annotated[str | None, Header()] = None, java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector), @@ -69,24 +70,59 @@ async def query( if cache_enable: cached_result = query_cache_manager.get( - str(data_source), dto.sql, dto.connection_info + data_source, dto.sql, dto.connection_info ) cache_hit = cached_result is not None - if cache_hit: - span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - return response - else: - result = connector.query(rewritten_sql, limit=limit) - if cache_enable: + match (cache_enable, cache_hit, override_cache): + # case 1 cache hit read + case (True, True, False): + span.add_event("cache hit") + response = ORJSONResponse(to_json(cached_result)) + response.headers["X-Cache-Hit"] = "true" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + # case 2 cache hit but override cache + case (True, True, True): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + # because we override the cache, so we need to set the cache hit to false + response.headers["X-Cache-Hit"] = "false" + response.headers["X-Cache-Create-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + query_cache_manager.set( + data_source, dto.sql, result, dto.connection_info + ) + response.headers["X-Cache-Override"] = "true" + response.headers["X-Cache-Override-At"] = str( + query_cache_manager.get_cache_file_timestamp( + data_source, dto.sql, dto.connection_info + ) + ) + # case 3 and case 4 cache miss read (first time cache read need to create cache) + # no matter the cache override or not, we need to create cache + case (True, False, _): + result = connector.query(rewritten_sql, limit=limit) + + # set cache query_cache_manager.set( data_source, dto.sql, result, dto.connection_info ) - response = ORJSONResponse(to_json(result)) - response.headers["X-Cache-Hit"] = str(cache_hit).lower() - return response + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + # case 5~8 Other cases (cache is not enabled) + case (False, _, _): + result = connector.query(rewritten_sql, limit=limit) + response = ORJSONResponse(to_json(result)) + response.headers["X-Cache-Hit"] = "false" + + return response except Exception as e: logger.warning( "Failed to execute v3 query, fallback to v2: {}\n" + MIGRATION_MESSAGE, @@ -97,6 +133,7 @@ async def query( dto, dry_run, cache_enable, + override_cache, limit, java_engine_connector, query_cache_manager, diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index 6c2b24546..014862dce 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -192,6 +192,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine ) assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() # Verify results are identical @@ -200,6 +201,37 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override( + client, manifest_str, postgres: PostgresContainer +): + connection_info = _to_connection_info(postgres) + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": 'SELECT * FROM "Orders" LIMIT 10', + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache and override it + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": 'SELECT * FROM "Orders" LIMIT 10', + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) + + async def test_query_with_connection_url( client, manifest_str, postgres: PostgresContainer ): diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py b/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py index 883a382b6..aa68593e5 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py @@ -71,6 +71,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() # Verify results are identical @@ -79,6 +80,34 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override(client, manifest_str, connection_info): + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT orderkey FROM orders LIMIT 1", + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache and override it + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT orderkey FROM orders LIMIT 1", + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) + + async def test_dry_run(client, manifest_str, connection_info): response = await client.post( url=f"{base_url}/query", diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_query.py b/ibis-server/tests/routers/v3/connector/postgres/test_query.py index dd7afd888..c38c63401 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_query.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_query.py @@ -163,6 +163,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert response2.status_code == 200 assert response2.headers["X-Cache-Hit"] == "true" + assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07 result2 = response2.json() assert result1["data"] == result2["data"] @@ -170,6 +171,34 @@ async def test_query_with_cache(client, manifest_str, connection_info): assert result1["dtypes"] == result2["dtypes"] +async def test_query_with_cache_override(client, manifest_str, connection_info): + # First request - should miss cache then create cache + response1 = await client.post( + url=f"{base_url}/query?cacheEnable=true", # Enable cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT * FROM wren.public.orders LIMIT 1", + }, + ) + assert response1.status_code == 200 + + # Second request with same SQL - should hit cache and override it + response2 = await client.post( + url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT * FROM wren.public.orders LIMIT 1", + }, + ) + assert response2.status_code == 200 + assert response2.headers["X-Cache-Override"] == "true" + assert int(response2.headers["X-Cache-Override-At"]) > int( + response2.headers["X-Cache-Create-At"] + ) + + async def test_query_with_connection_url(client, manifest_str, connection_url): response = await client.post( url=f"{base_url}/query",