Skip to content

Commit c1d19e8

Browse files
authored
feat(ibis): override query cache (#1138)
1 parent a234d01 commit c1d19e8

File tree

6 files changed

+222
-26
lines changed

6 files changed

+222
-26
lines changed

ibis-server/app/query_cache/__init__.py

+36-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import hashlib
2+
import time
23
from typing import Any, Optional
34

45
import ibis
@@ -37,7 +38,7 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]:
3738
@tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL)
3839
def set(self, data_source: str, sql: str, result: Any, info) -> None:
3940
cache_key = self._generate_cache_key(data_source, sql, info)
40-
cache_file_name = self._get_cache_file_name(cache_key)
41+
cache_file_name = self._set_cache_file_name(cache_key)
4142
op = self._get_dal_operator()
4243
full_path = self._get_full_path(cache_file_name)
4344

@@ -52,6 +53,22 @@ def set(self, data_source: str, sql: str, result: Any, info) -> None:
5253
logger.debug(f"Failed to write query cache: {e}")
5354
return
5455

56+
def get_cache_file_timestamp(self, data_source: str, sql: str, info) -> int | None:
57+
cache_key = self._generate_cache_key(data_source, sql, info)
58+
op = self._get_dal_operator()
59+
for file in op.list("/"):
60+
if file.path.startswith(cache_key):
61+
# xxxxxxxxxxxxxx-1744016574.cache
62+
# we only care about the timestamp part
63+
try:
64+
timestamp = int(file.path.split("-")[-1].split(".")[0])
65+
return timestamp
66+
except (IndexError, ValueError) as e:
67+
logger.debug(
68+
f"Failed to extract timestamp from cache file {file.path}: {e}"
69+
)
70+
return None
71+
5572
def _generate_cache_key(self, data_source: str, sql: str, info) -> str:
5673
key_parts = [
5774
data_source,
@@ -65,7 +82,24 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str:
6582
return hashlib.sha256(key_string.encode()).hexdigest()
6683

6784
def _get_cache_file_name(self, cache_key: str) -> str:
68-
return f"{cache_key}.cache"
85+
op = self._get_dal_operator()
86+
for file in op.list("/"):
87+
if file.path.startswith(cache_key):
88+
return file.path
89+
90+
cache_create_timestamp = int(time.time() * 1000)
91+
return f"{cache_key}-{cache_create_timestamp}.cache"
92+
93+
def _set_cache_file_name(self, cache_key: str) -> str:
94+
# Delete old cache files, make only one cache file per query
95+
op = self._get_dal_operator()
96+
for file in op.list("/"):
97+
if file.path.startswith(cache_key):
98+
logger.info(f"Deleting old cache file {file.path}")
99+
op.delete(file.path)
100+
101+
cache_create_timestamp = int(time.time() * 1000)
102+
return f"{cache_key}-{cache_create_timestamp}.cache"
69103

70104
def _get_full_path(self, path: str) -> str:
71105
return self.root + path

ibis-server/app/routers/v2/connector.py

+47-12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async def query(
4343
dto: QueryDTO,
4444
dry_run: Annotated[bool, Query(alias="dryRun")] = False,
4545
cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False,
46+
override_cache: Annotated[bool, Query(alias="overrideCache")] = False,
4647
limit: int | None = None,
4748
java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector),
4849
query_cache_manager: QueryCacheManager = Depends(get_query_cache_manager),
@@ -84,25 +85,59 @@ async def query(
8485

8586
if cache_enable:
8687
cached_result = query_cache_manager.get(
87-
str(data_source), dto.sql, dto.connection_info
88+
data_source, dto.sql, dto.connection_info
8889
)
8990
cache_hit = cached_result is not None
9091

91-
if cache_hit:
92-
span.add_event("cache hit")
93-
response = ORJSONResponse(to_json(cached_result))
94-
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
95-
return response
96-
else:
97-
result = connector.query(rewritten_sql, limit=limit)
98-
if cache_enable:
92+
match (cache_enable, cache_hit, override_cache):
93+
# case 1 cache hit read
94+
case (True, True, False):
95+
span.add_event("cache hit")
96+
response = ORJSONResponse(to_json(cached_result))
97+
response.headers["X-Cache-Hit"] = "true"
98+
response.headers["X-Cache-Create-At"] = str(
99+
query_cache_manager.get_cache_file_timestamp(
100+
data_source, dto.sql, dto.connection_info
101+
)
102+
)
103+
# case 2 cache hit but override cache
104+
case (True, True, True):
105+
result = connector.query(rewritten_sql, limit=limit)
106+
response = ORJSONResponse(to_json(result))
107+
# because we override the cache, so we need to set the cache hit to false
108+
response.headers["X-Cache-Hit"] = "false"
109+
response.headers["X-Cache-Create-At"] = str(
110+
query_cache_manager.get_cache_file_timestamp(
111+
data_source, dto.sql, dto.connection_info
112+
)
113+
)
114+
query_cache_manager.set(
115+
data_source, dto.sql, result, dto.connection_info
116+
)
117+
response.headers["X-Cache-Override"] = "true"
118+
response.headers["X-Cache-Override-At"] = str(
119+
query_cache_manager.get_cache_file_timestamp(
120+
data_source, dto.sql, dto.connection_info
121+
)
122+
)
123+
# case 3 and case 4 cache miss read (first time cache read need to create cache)
124+
# no matter the cache override or not, we need to create cache
125+
case (True, False, _):
126+
result = connector.query(rewritten_sql, limit=limit)
127+
128+
# set cache
99129
query_cache_manager.set(
100130
data_source, dto.sql, result, dto.connection_info
101131
)
132+
response = ORJSONResponse(to_json(result))
133+
response.headers["X-Cache-Hit"] = "false"
134+
# case 5~8 Other cases (cache is not enabled)
135+
case (False, _, _):
136+
result = connector.query(rewritten_sql, limit=limit)
137+
response = ORJSONResponse(to_json(result))
138+
response.headers["X-Cache-Hit"] = "false"
102139

103-
response = ORJSONResponse(to_json(result))
104-
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
105-
return response
140+
return response
106141

107142

108143
@router.post("/{data_source}/validate/{rule_name}", deprecated=True)

ibis-server/app/routers/v3/connector.py

+49-12
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def query(
3838
dto: QueryDTO,
3939
dry_run: Annotated[bool, Query(alias="dryRun")] = False,
4040
cache_enable: Annotated[bool, Query(alias="cacheEnable")] = False,
41+
override_cache: Annotated[bool, Query(alias="overrideCache")] = False,
4142
limit: int | None = None,
4243
headers: Annotated[str | None, Header()] = None,
4344
java_engine_connector: JavaEngineConnector = Depends(get_java_engine_connector),
@@ -69,24 +70,59 @@ async def query(
6970

7071
if cache_enable:
7172
cached_result = query_cache_manager.get(
72-
str(data_source), dto.sql, dto.connection_info
73+
data_source, dto.sql, dto.connection_info
7374
)
7475
cache_hit = cached_result is not None
7576

76-
if cache_hit:
77-
span.add_event("cache hit")
78-
response = ORJSONResponse(to_json(cached_result))
79-
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
80-
return response
81-
else:
82-
result = connector.query(rewritten_sql, limit=limit)
83-
if cache_enable:
77+
match (cache_enable, cache_hit, override_cache):
78+
# case 1 cache hit read
79+
case (True, True, False):
80+
span.add_event("cache hit")
81+
response = ORJSONResponse(to_json(cached_result))
82+
response.headers["X-Cache-Hit"] = "true"
83+
response.headers["X-Cache-Create-At"] = str(
84+
query_cache_manager.get_cache_file_timestamp(
85+
data_source, dto.sql, dto.connection_info
86+
)
87+
)
88+
# case 2 cache hit but override cache
89+
case (True, True, True):
90+
result = connector.query(rewritten_sql, limit=limit)
91+
response = ORJSONResponse(to_json(result))
92+
# because we override the cache, so we need to set the cache hit to false
93+
response.headers["X-Cache-Hit"] = "false"
94+
response.headers["X-Cache-Create-At"] = str(
95+
query_cache_manager.get_cache_file_timestamp(
96+
data_source, dto.sql, dto.connection_info
97+
)
98+
)
99+
query_cache_manager.set(
100+
data_source, dto.sql, result, dto.connection_info
101+
)
102+
response.headers["X-Cache-Override"] = "true"
103+
response.headers["X-Cache-Override-At"] = str(
104+
query_cache_manager.get_cache_file_timestamp(
105+
data_source, dto.sql, dto.connection_info
106+
)
107+
)
108+
# case 3 and case 4 cache miss read (first time cache read need to create cache)
109+
# no matter the cache override or not, we need to create cache
110+
case (True, False, _):
111+
result = connector.query(rewritten_sql, limit=limit)
112+
113+
# set cache
84114
query_cache_manager.set(
85115
data_source, dto.sql, result, dto.connection_info
86116
)
87-
response = ORJSONResponse(to_json(result))
88-
response.headers["X-Cache-Hit"] = str(cache_hit).lower()
89-
return response
117+
response = ORJSONResponse(to_json(result))
118+
response.headers["X-Cache-Hit"] = "false"
119+
# case 5~8 Other cases (cache is not enabled)
120+
case (False, _, _):
121+
result = connector.query(rewritten_sql, limit=limit)
122+
response = ORJSONResponse(to_json(result))
123+
response.headers["X-Cache-Hit"] = "false"
124+
125+
return response
90126
except Exception as e:
91127
logger.warning(
92128
"Failed to execute v3 query, fallback to v2: {}\n" + MIGRATION_MESSAGE,
@@ -97,6 +133,7 @@ async def query(
97133
dto,
98134
dry_run,
99135
cache_enable,
136+
override_cache,
100137
limit,
101138
java_engine_connector,
102139
query_cache_manager,

ibis-server/tests/routers/v2/connector/test_postgres.py

+32
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine
192192
)
193193
assert response2.status_code == 200
194194
assert response2.headers["X-Cache-Hit"] == "true"
195+
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
195196
result2 = response2.json()
196197

197198
# Verify results are identical
@@ -200,6 +201,37 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine
200201
assert result1["dtypes"] == result2["dtypes"]
201202

202203

204+
async def test_query_with_cache_override(
205+
client, manifest_str, postgres: PostgresContainer
206+
):
207+
connection_info = _to_connection_info(postgres)
208+
# First request - should miss cache then create cache
209+
response1 = await client.post(
210+
url=f"{base_url}/query?cacheEnable=true", # Enable cache
211+
json={
212+
"connectionInfo": connection_info,
213+
"manifestStr": manifest_str,
214+
"sql": 'SELECT * FROM "Orders" LIMIT 10',
215+
},
216+
)
217+
assert response1.status_code == 200
218+
219+
# Second request with same SQL - should hit cache and override it
220+
response2 = await client.post(
221+
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Enable cache
222+
json={
223+
"connectionInfo": connection_info,
224+
"manifestStr": manifest_str,
225+
"sql": 'SELECT * FROM "Orders" LIMIT 10',
226+
},
227+
)
228+
assert response2.status_code == 200
229+
assert response2.headers["X-Cache-Override"] == "true"
230+
assert int(response2.headers["X-Cache-Override-At"]) > int(
231+
response2.headers["X-Cache-Create-At"]
232+
)
233+
234+
203235
async def test_query_with_connection_url(
204236
client, manifest_str, postgres: PostgresContainer
205237
):

ibis-server/tests/routers/v3/connector/postgres/test_fallback_v2.py

+29
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async def test_query_with_cache(client, manifest_str, connection_info):
7171

7272
assert response2.status_code == 200
7373
assert response2.headers["X-Cache-Hit"] == "true"
74+
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
7475
result2 = response2.json()
7576

7677
# Verify results are identical
@@ -79,6 +80,34 @@ async def test_query_with_cache(client, manifest_str, connection_info):
7980
assert result1["dtypes"] == result2["dtypes"]
8081

8182

83+
async def test_query_with_cache_override(client, manifest_str, connection_info):
84+
# First request - should miss cache then create cache
85+
response1 = await client.post(
86+
url=f"{base_url}/query?cacheEnable=true", # Enable cache
87+
json={
88+
"connectionInfo": connection_info,
89+
"manifestStr": manifest_str,
90+
"sql": "SELECT orderkey FROM orders LIMIT 1",
91+
},
92+
)
93+
assert response1.status_code == 200
94+
95+
# Second request with same SQL - should hit cache and override it
96+
response2 = await client.post(
97+
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache
98+
json={
99+
"connectionInfo": connection_info,
100+
"manifestStr": manifest_str,
101+
"sql": "SELECT orderkey FROM orders LIMIT 1",
102+
},
103+
)
104+
assert response2.status_code == 200
105+
assert response2.headers["X-Cache-Override"] == "true"
106+
assert int(response2.headers["X-Cache-Override-At"]) > int(
107+
response2.headers["X-Cache-Create-At"]
108+
)
109+
110+
82111
async def test_dry_run(client, manifest_str, connection_info):
83112
response = await client.post(
84113
url=f"{base_url}/query",

ibis-server/tests/routers/v3/connector/postgres/test_query.py

+29
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,42 @@ async def test_query_with_cache(client, manifest_str, connection_info):
163163

164164
assert response2.status_code == 200
165165
assert response2.headers["X-Cache-Hit"] == "true"
166+
assert int(response2.headers["X-Cache-Create-At"]) > 1743984000 # 2025.04.07
166167
result2 = response2.json()
167168

168169
assert result1["data"] == result2["data"]
169170
assert result1["columns"] == result2["columns"]
170171
assert result1["dtypes"] == result2["dtypes"]
171172

172173

174+
async def test_query_with_cache_override(client, manifest_str, connection_info):
175+
# First request - should miss cache then create cache
176+
response1 = await client.post(
177+
url=f"{base_url}/query?cacheEnable=true", # Enable cache
178+
json={
179+
"connectionInfo": connection_info,
180+
"manifestStr": manifest_str,
181+
"sql": "SELECT * FROM wren.public.orders LIMIT 1",
182+
},
183+
)
184+
assert response1.status_code == 200
185+
186+
# Second request with same SQL - should hit cache and override it
187+
response2 = await client.post(
188+
url=f"{base_url}/query?cacheEnable=true&overrideCache=true", # Override the cache
189+
json={
190+
"connectionInfo": connection_info,
191+
"manifestStr": manifest_str,
192+
"sql": "SELECT * FROM wren.public.orders LIMIT 1",
193+
},
194+
)
195+
assert response2.status_code == 200
196+
assert response2.headers["X-Cache-Override"] == "true"
197+
assert int(response2.headers["X-Cache-Override-At"]) > int(
198+
response2.headers["X-Cache-Create-At"]
199+
)
200+
201+
173202
async def test_query_with_connection_url(client, manifest_str, connection_url):
174203
response = await client.post(
175204
url=f"{base_url}/query",

0 commit comments

Comments
 (0)