Skip to content

Commit 3a7111b

Browse files
committed
use opendal to unify fs access
1 parent cb10f96 commit 3a7111b

File tree

3 files changed

+29
-57
lines changed

3 files changed

+29
-57
lines changed
+27-13
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
import hashlib
2-
import os
32
from typing import Any, Optional
43

54
import ibis
5+
import opendal
66
from loguru import logger
77
from opentelemetry import trace
88

99
tracer = trace.get_tracer(__name__)
1010

1111

1212
class QueryCacheManager:
13-
def __init__(self):
14-
pass
13+
def __init__(self, root: str = "/tmp/wren-engine/"):
14+
self.root = root
1515

1616
@tracer.start_as_current_span("get_cache", kind=trace.SpanKind.INTERNAL)
1717
def get(self, data_source: str, sql: str, info) -> Optional[Any]:
1818
cache_key = self._generate_cache_key(data_source, sql, info)
19-
cache_path = self._get_cache_path(cache_key)
19+
cache_file_name = self._get_cache_file_name(cache_key)
20+
op = self._get_dal_operator()
21+
full_path = self._get_full_path(cache_file_name)
2022

2123
# Check if cache file exists
22-
if os.path.exists(cache_path):
24+
if op.exists(cache_file_name):
2325
try:
24-
cache = ibis.read_parquet(cache_path)
26+
logger.info(f"\nReading query cache {cache_file_name}\n")
27+
cache = ibis.read_parquet(full_path)
2528
df = cache.execute()
29+
logger.info("\nquery cache to dataframe\n")
2630
return df
2731
except Exception as e:
2832
logger.debug(f"Failed to read query cache {e}")
@@ -33,14 +37,17 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]:
3337
@tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL)
3438
def set(self, data_source: str, sql: str, result: Any, info) -> None:
3539
cache_key = self._generate_cache_key(data_source, sql, info)
36-
cache_path = self._get_cache_path(cache_key)
40+
cache_file_name = self._get_cache_file_name(cache_key)
41+
op = self._get_dal_operator()
42+
full_path = self._get_full_path(cache_file_name)
3743

3844
try:
3945
# Create cache directory if it doesn't exist
40-
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
41-
cache = ibis.memtable(result)
42-
logger.info(f"\nWriting query cache to {cache_path}\n")
43-
cache.to_parquet(cache_path)
46+
with op.open(cache_file_name, mode="wb") as file:
47+
cache = ibis.memtable(result)
48+
logger.info(f"\nWriting query cache to {cache_file_name}\n")
49+
if file.writable():
50+
cache.to_parquet(full_path)
4451
except Exception as e:
4552
logger.debug(f"Failed to write query cache: {e}")
4653
return
@@ -57,5 +64,12 @@ def _generate_cache_key(self, data_source: str, sql: str, info) -> str:
5764

5865
return hashlib.sha256(key_string.encode()).hexdigest()
5966

60-
def _get_cache_path(self, cache_key: str) -> str:
61-
return f"/tmp/wren-engine/{cache_key}.cache"
67+
def _get_cache_file_name(self, cache_key: str) -> str:
68+
return f"{cache_key}.cache"
69+
70+
def _get_full_path(self, path: str) -> str:
71+
return self.root + path
72+
73+
def _get_dal_operator(self) -> Any:
74+
# Default implementation using local filesystem
75+
return opendal.Operator("fs", root=self.root)

ibis-server/tests/routers/v2/connector/test_postgres.py

+1-22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import base64
2-
import os
3-
import shutil
42
from urllib.parse import quote_plus, urlparse
53

64
import orjson
@@ -12,7 +10,6 @@
1210
from testcontainers.postgres import PostgresContainer
1311

1412
from app.model.validator import rules
15-
from app.query_cache import QueryCacheManager
1613
from tests.conftest import file_path
1714

1815
pytestmark = pytest.mark.postgres
@@ -127,15 +124,6 @@ def postgres(request) -> PostgresContainer:
127124
return pg
128125

129126

130-
@pytest.fixture(scope="function")
131-
def cache_dir():
132-
temp_dir = "/tmp/wren-engine-test"
133-
os.makedirs(temp_dir, exist_ok=True)
134-
yield temp_dir
135-
# Clean up after the test
136-
shutil.rmtree(temp_dir, ignore_errors=True)
137-
138-
139127
async def test_query(client, manifest_str, postgres: PostgresContainer):
140128
connection_info = _to_connection_info(postgres)
141129
response = await client.post(
@@ -176,16 +164,7 @@ async def test_query(client, manifest_str, postgres: PostgresContainer):
176164
}
177165

178166

179-
async def test_query_with_cache(
180-
client, manifest_str, postgres: PostgresContainer, cache_dir, monkeypatch
181-
):
182-
# Override the cache path to use our test directory
183-
monkeypatch.setattr(
184-
QueryCacheManager,
185-
"_get_cache_path",
186-
lambda self, key: f"{cache_dir}/{key}.cache",
187-
)
188-
167+
async def test_query_with_cache(client, manifest_str, postgres: PostgresContainer):
189168
connection_info = _to_connection_info(postgres)
190169

191170
# First request - should miss cache

ibis-server/tests/routers/v3/connector/postgres/test_query.py

+1-22
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import base64
2-
import os
3-
import shutil
42

53
import orjson
64
import pytest
75

8-
from app.query_cache import QueryCacheManager
96
from tests.routers.v3.connector.postgres.conftest import base_url
107

118
manifest = {
@@ -100,15 +97,6 @@ def manifest_str():
10097
return base64.b64encode(orjson.dumps(manifest)).decode("utf-8")
10198

10299

103-
@pytest.fixture(scope="function")
104-
def cache_dir():
105-
temp_dir = "/tmp/wren-engine-test"
106-
os.makedirs(temp_dir, exist_ok=True)
107-
yield temp_dir
108-
# Clean up after the test
109-
shutil.rmtree(temp_dir, ignore_errors=True)
110-
111-
112100
async def test_query(client, manifest_str, connection_info):
113101
response = await client.post(
114102
url=f"{base_url}/query",
@@ -148,16 +136,7 @@ async def test_query(client, manifest_str, connection_info):
148136
}
149137

150138

151-
async def test_query_with_cache(
152-
client, manifest_str, connection_info, cache_dir, monkeypatch
153-
):
154-
# Override the cache path to use our test directory
155-
monkeypatch.setattr(
156-
QueryCacheManager,
157-
"_get_cache_path",
158-
lambda self, key: f"{cache_dir}/{key}.cache",
159-
)
160-
139+
async def test_query_with_cache(client, manifest_str, connection_info):
161140
# First request - should miss cache
162141
response1 = await client.post(
163142
url=f"{base_url}/query?cacheEnable=true", # Enable cache

0 commit comments

Comments
 (0)