Skip to content

feat(ibis): override query cache #1138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions ibis-server/app/query_cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import time
from typing import Any, Optional

import ibis
Expand Down Expand Up @@ -37,7 +38,7 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]:
@tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL)
def set(self, data_source: str, sql: str, result: Any, info) -> None:
cache_key = self._generate_cache_key(data_source, sql, info)
cache_file_name = self._get_cache_file_name(cache_key)
cache_file_name = self._set_cache_file_name(cache_key)
op = self._get_dal_operator()
full_path = self._get_full_path(cache_file_name)

Expand All @@ -52,6 +53,22 @@ def set(self, data_source: str, sql: str, result: Any, info) -> None:
logger.debug(f"Failed to write query cache: {e}")
return

def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int | None:
cache_key = self._generate_cache_key(data_source, sql, info)
op = self._get_dal_operator()
for file in op.list("/"):
if file.path.startswith(cache_key):
# xxxxxxxxxxxxxx-1744016574.cache
# we only care about the timestamp part
try:
timestamp = int(file.path.split("-")[-1].split(".")[0])
return timestamp
except (IndexError, ValueError) as e:
logger.debug(
f"Failed to extract timestamp from cache file {file.path}: {e}"
)
return None

def _generate_cache_key(self, data_source: str, sql: str, info) -> str:
key_parts = [
data_source,
Expand All @@ -65,7 +82,24 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str:
return hashlib.sha256(key_string.encode()).hexdigest()

def _get_cache_file_name(self, cache_key: str) -> str:
return f"{cache_key}.cache"
op = self._get_dal_operator()
for file in op.list("/"):
if file.path.startswith(cache_key):
return file.path

cache_create_timestamp = int(time.time() * 1000)
return f"{cache_key}-{cache_create_timestamp}.cache"

def _set_cache_file_name(self, cache_key: str) -> str:
# Delete old cache files, make only one cache file per query
op = self._get_dal_operator()
for file in op.list("/"):
if file.path.startswith(cache_key):
logger.info(f"Deleting old cache file {file.path}")
op.delete(file.path)

cache_create_timestamp = int(time.time() * 1000)
return f"{cache_key}-{cache_create_timestamp}.cache"

def _get_full_path(self, path: str) -> str:
return self.root + path
Expand Down
59 changes: 47 additions & 12 deletions ibis-server/app/routers/v2/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def query(
dto: QueryDTO,
dry_run: Annotated[bool, Query(alias="dryRun")] = False,
cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False,
override_cache: Annotated[bool, Query(alias="overrideCache")] = False,
limit: int | None = None,
java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector),
query_cache_manager: QueryCacheManager = Depends(get_query_cache_manager),
Expand Down Expand Up @@ -84,25 +85,59 @@ async def query(

if cache_enable:
cached_result = query_cache_manager.get(
str(data_source), dto.sql, dto.connection_info
data_source, dto.sql, dto.connection_info
)
cache_hit = cached_result is not None

if cache_hit:
span.add_event("cache hit")
response = ORJSONResponse(to_json(cached_result))
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
return response
else:
result = connector.query(rewritten_sql, limit=limit)
if cache_enable:
match (cache_enable, cache_hit, override_cache):
# case 1 cache hit read
case (True, True, False):
span.add_event("cache hit")
response = ORJSONResponse(to_json(cached_result))
response.headers["X-Cache-Hit"] = "true"
response.headers["X-Cache-Create-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
# case 2 cache hit but override cache
case (True, True, True):
result = connector.query(rewritten_sql, limit=limit)
response = ORJSONResponse(to_json(result))
# because we override the cache, so we need to set the cache hit to false
response.headers["X-Cache-Hit"] = "false"
response.headers["X-Cache-Create-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
query_cache_manager.set(
data_source, dto.sql, result, dto.connection_info
)
response.headers["X-Cache-Override"] = "true"
response.headers["X-Cache-Override-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
# case 3 and case 4 cache miss read (first time cache read need to create cache)
# no matter the cache override or not, we need to create cache
case (True, False, _):
result = connector.query(rewritten_sql, limit=limit)

# set cache
query_cache_manager.set(
data_source, dto.sql, result, dto.connection_info
)
response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = "false"
# case 5~8 Other cases (cache is not enabled)
case (False, _, _):
result = connector.query(rewritten_sql, limit=limit)
response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = "false"

response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
return response
return response


@router.post("/{data_source}/validate/{rule_name}", deprecated=True)
Expand Down
61 changes: 49 additions & 12 deletions ibis-server/app/routers/v3/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async def query(
dto: QueryDTO,
dry_run: Annotated[bool, Query(alias="dryRun")] = False,
cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False,
override_cache: Annotated[bool, Query(alias="overrideCache")] = False,
limit: int | None = None,
headers: Annotated[str | None, Header()] = None,
java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector),
Expand Down Expand Up @@ -69,24 +70,59 @@ async def query(

if cache_enable:
cached_result = query_cache_manager.get(
str(data_source), dto.sql, dto.connection_info
data_source, dto.sql, dto.connection_info
)
cache_hit = cached_result is not None

if cache_hit:
span.add_event("cache hit")
response = ORJSONResponse(to_json(cached_result))
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
return response
else:
result = connector.query(rewritten_sql, limit=limit)
if cache_enable:
match (cache_enable, cache_hit, override_cache):
# case 1 cache hit read
case (True, True, False):
span.add_event("cache hit")
response = ORJSONResponse(to_json(cached_result))
response.headers["X-Cache-Hit"] = "true"
response.headers["X-Cache-Create-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
# case 2 cache hit but override cache
case (True, True, True):
result = connector.query(rewritten_sql, limit=limit)
response = ORJSONResponse(to_json(result))
# because we override the cache, so we need to set the cache hit to false
response.headers["X-Cache-Hit"] = "false"
response.headers["X-Cache-Create-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
query_cache_manager.set(
data_source, dto.sql, result, dto.connection_info
)
response.headers["X-Cache-Override"] = "true"
response.headers["X-Cache-Override-At"] = str(
query_cache_manager.get_cache_file_timestamp(
data_source, dto.sql, dto.connection_info
)
)
# case 3 and case 4 cache miss read (first time cache read need to create cache)
# no matter the cache override or not, we need to create cache
case (True, False, _):
result = connector.query(rewritten_sql, limit=limit)

# set cache
query_cache_manager.set(
data_source, dto.sql, result, dto.connection_info
)
response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
return response
response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = "false"
# case 5~8 Other cases (cache is not enabled)
case (False, _, _):
result = connector.query(rewritten_sql, limit=limit)
response = ORJSONResponse(to_json(result))
response.headers["X-Cache-Hit"] = "false"

return response
except Exception as e:
logger.warning(
"Failed to execute v3 query, fallback to v2: {}\n" + MIGRATION_MESSAGE,
Expand All @@ -97,6 +133,7 @@ async def query(
dto,
dry_run,
cache_enable,
override_cache,
limit,
java_engine_connector,
query_cache_manager,
Expand Down
32 changes: 32 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine
)
assert response2.status_code == 200
assert response2.headers["X-Cache-Hit"] == "true"
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
result2 = response2.json()

# Verify results are identical
Expand All @@ -200,6 +201,37 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine
assert result1["dtypes"] == result2["dtypes"]


async def test_query_with_cache_override(
client, manifest_str, postgres: PostgresContainer
):
connection_info = _to_connection_info(postgres)
# First request - should miss cache then create cache
response1 = await client.post(
url=f"{base_url}/query?cacheEnable=true", # Enable cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": 'SELECT * FROM "Orders" LIMIT 10',
},
)
assert response1.status_code == 200

# Second request with same SQL - should hit cache and override it
response2 = await client.post(
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Enable cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": 'SELECT * FROM "Orders" LIMIT 10',
},
)
assert response2.status_code == 200
assert response2.headers["X-Cache-Override"] == "true"
assert int(response2.headers["X-Cache-Override-At"]) > int(
response2.headers["X-Cache-Create-At"]
)


async def test_query_with_connection_url(
client, manifest_str, postgres: PostgresContainer
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def test_query_with_cache(client, manifest_str, connection_info):

assert response2.status_code == 200
assert response2.headers["X-Cache-Hit"] == "true"
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
result2 = response2.json()

# Verify results are identical
Expand All @@ -79,6 +80,34 @@ async def test_query_with_cache(client, manifest_str, connection_info):
assert result1["dtypes"] == result2["dtypes"]


async def test_query_with_cache_override(client, manifest_str, connection_info):
# First request - should miss cache then create cache
response1 = await client.post(
url=f"{base_url}/query?cacheEnable=true", # Enable cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": "SELECT orderkey FROM orders LIMIT 1",
},
)
assert response1.status_code == 200

# Second request with same SQL - should hit cache and override it
response2 = await client.post(
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": "SELECT orderkey FROM orders LIMIT 1",
},
)
assert response2.status_code == 200
assert response2.headers["X-Cache-Override"] == "true"
assert int(response2.headers["X-Cache-Override-At"]) > int(
response2.headers["X-Cache-Create-At"]
)


async def test_dry_run(client, manifest_str, connection_info):
response = await client.post(
url=f"{base_url}/query",
Expand Down
29 changes: 29 additions & 0 deletions ibis-server/tests/routers/v3/connector/postgres/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,42 @@ async def test_query_with_cache(client, manifest_str, connection_info):

assert response2.status_code == 200
assert response2.headers["X-Cache-Hit"] == "true"
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
result2 = response2.json()

assert result1["data"] == result2["data"]
assert result1["columns"] == result2["columns"]
assert result1["dtypes"] == result2["dtypes"]


async def test_query_with_cache_override(client, manifest_str, connection_info):
# First request - should miss cache then create cache
response1 = await client.post(
url=f"{base_url}/query?cacheEnable=true", # Enable cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": "SELECT * FROM wren.public.orders LIMIT 1",
},
)
assert response1.status_code == 200

# Second request with same SQL - should hit cache and override it
response2 = await client.post(
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": "SELECT * FROM wren.public.orders LIMIT 1",
},
)
assert response2.status_code == 200
assert response2.headers["X-Cache-Override"] == "true"
assert int(response2.headers["X-Cache-Override-At"]) > int(
response2.headers["X-Cache-Create-At"]
)


async def test_query_with_connection_url(client, manifest_str, connection_url):
response = await client.post(
url=f"{base_url}/query",
Expand Down