Skip to content

feat(ibis): introduce minio connector #1048

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ibis-server/app/mdl/rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def _get_read_dialect(cls, experiment) -> str | None:
def _get_write_dialect(cls, data_source: DataSource) -> str:
if data_source == DataSource.canner:
return "trino"
elif data_source in {DataSource.local_file, DataSource.s3_file}:
elif data_source in {
DataSource.local_file,
DataSource.s3_file,
DataSource.minio_file,
}:
return "duckdb"
return data_source.name

Expand Down
19 changes: 19 additions & 0 deletions ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class QueryS3FileDTO(QueryDTO):
connection_info: S3FileConnectionInfo = connection_info_field


class QueryMinioFileDTO(QueryDTO):
connection_info: MinioFileConnectionInfo = connection_info_field


class BigQueryConnectionInfo(BaseModel):
project_id: SecretStr
dataset_id: SecretStr
Expand Down Expand Up @@ -162,6 +166,20 @@ class S3FileConnectionInfo(BaseModel):
secret_key: SecretStr


class MinioFileConnectionInfo(BaseModel):
url: SecretStr = Field(description="the root path of the s3 bucket", default="/")
format: str = Field(
description="File format", default="csv", examples=["csv", "parquet", "json"]
)
ssl_enabled: bool = Field(
description="use the ssl connection or not", default=False
)
endpoint: SecretStr
bucket: SecretStr
access_key: SecretStr
secret_key: SecretStr


ConnectionInfo = (
BigQueryConnectionInfo
| CannerConnectionInfo
Expand All @@ -173,6 +191,7 @@ class S3FileConnectionInfo(BaseModel):
| TrinoConnectionInfo
| LocalFileConnectionInfo
| S3FileConnectionInfo
| MinioFileConnectionInfo
)


Expand Down
19 changes: 14 additions & 5 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
import ibis.formats
import pandas as pd
import sqlglot.expressions as sge
from duckdb import HTTPException
from duckdb import HTTPException, IOException
from google.cloud import bigquery
from google.oauth2 import service_account
from ibis import BaseBackend
from ibis.backends.sql.compilers.postgres import compiler as postgres_compiler

from app.model import (
ConnectionInfo,
MinioFileConnectionInfo,
S3FileConnectionInfo,
UnknownIbisError,
UnprocessableEntityError,
)
from app.model.data_source import DataSource
from app.model.utils import init_duckdb_s3
from app.model.utils import init_duckdb_minio, init_duckdb_s3

# Override datatypes of ibis
importlib.import_module("app.custom_ibis.backends.sql.datatypes")
Expand All @@ -37,9 +38,11 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self._connector = CannerConnector(connection_info)
elif data_source == DataSource.bigquery:
self._connector = BigQueryConnector(connection_info)
elif data_source == DataSource.local_file:
self._connector = DuckDBConnector(connection_info)
elif data_source == DataSource.s3_file:
elif data_source in {
DataSource.local_file,
DataSource.s3_file,
DataSource.minio_file,
}:
self._connector = DuckDBConnector(connection_info)
else:
self._connector = SimpleConnector(data_source, connection_info)
Expand Down Expand Up @@ -162,16 +165,22 @@ def __init__(self, connection_info: ConnectionInfo):
self.connection = duckdb.connect()
if isinstance(connection_info, S3FileConnectionInfo):
init_duckdb_s3(self.connection, connection_info)
if isinstance(connection_info, MinioFileConnectionInfo):
init_duckdb_minio(self.connection, connection_info)

def query(self, sql: str, limit: int) -> pd.DataFrame:
try:
return self.connection.execute(sql).fetch_df().head(limit)
except IOException as e:
raise UnprocessableEntityError(f"Failed to execute query: {e!s}")
except HTTPException as e:
raise UnprocessableEntityError(f"Failed to execute query: {e!s}")

def dry_run(self, sql: str) -> None:
try:
self.connection.execute(sql)
except IOException as e:
raise QueryDryRunError(f"Failed to execute query: {e!s}")
except HTTPException as e:
raise QueryDryRunError(f"Failed to execute query: {e!s}")

Expand Down
3 changes: 3 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
QueryClickHouseDTO,
QueryDTO,
QueryLocalFileDTO,
QueryMinioFileDTO,
QueryMSSqlDTO,
QueryMySqlDTO,
QueryPostgresDTO,
Expand All @@ -46,6 +47,7 @@ class DataSource(StrEnum):
trino = auto()
local_file = auto()
s3_file = auto()
minio_file = auto()

def get_connection(self, info: ConnectionInfo) -> BaseBackend:
try:
Expand All @@ -71,6 +73,7 @@ class DataSourceExtension(Enum):
trino = QueryTrinoDTO
local_file = QueryLocalFileDTO
s3_file = QueryS3FileDTO
minio_file = QueryMinioFileDTO

def __init__(self, dto: QueryDTO):
self.dto = dto
Expand Down
7 changes: 6 additions & 1 deletion ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from app.model.metadata.metadata import Metadata
from app.model.metadata.mssql import MSSQLMetadata
from app.model.metadata.mysql import MySQLMetadata
from app.model.metadata.object_storage import LocalFileMetadata, S3FileMetadata
from app.model.metadata.object_storage import (
LocalFileMetadata,
MinioFileMetadata,
S3FileMetadata,
)
from app.model.metadata.postgres import PostgresMetadata
from app.model.metadata.snowflake import SnowflakeMetadata
from app.model.metadata.trino import TrinoMetadata
Expand All @@ -21,6 +25,7 @@
DataSource.snowflake: SnowflakeMetadata,
DataSource.local_file: LocalFileMetadata,
DataSource.s3_file: S3FileMetadata,
DataSource.minio_file: MinioFileMetadata,
}


Expand Down
41 changes: 40 additions & 1 deletion ibis-server/app/model/metadata/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from app.model import (
LocalFileConnectionInfo,
MinioFileConnectionInfo,
S3FileConnectionInfo,
UnprocessableEntityError,
)
Expand All @@ -16,7 +17,7 @@
TableProperties,
)
from app.model.metadata.metadata import Metadata
from app.model.utils import init_duckdb_s3
from app.model.utils import init_duckdb_minio, init_duckdb_s3


class ObjectStorageMetadata(Metadata):
Expand Down Expand Up @@ -201,3 +202,41 @@ def _get_full_path(self, path):
path = path[1:]

return f"s3://{self.connection_info.bucket.get_secret_value()}/{path}"


class MinioFileMetadata(ObjectStorageMetadata):
def __init__(self, connection_info: MinioFileConnectionInfo):
super().__init__(connection_info)

def get_version(self):
return "Minio"

def _get_connection(self):
conn = duckdb.connect()
init_duckdb_minio(conn, self.connection_info)
logger.debug("Initialized duckdb minio")
return conn

def _get_dal_operator(self):
info: MinioFileConnectionInfo = self.connection_info

if info.ssl_enabled:
endpoint = f"https://{info.endpoint.get_secret_value()}"
else:
endpoint = f"http://{info.endpoint.get_secret_value()}"

return opendal.Operator(
"s3",
root=info.url.get_secret_value(),
bucket=info.bucket.get_secret_value(),
region="ap-northeast-1",
endpoint=endpoint,
secret_access_key=info.secret_key.get_secret_value(),
access_key_id=info.access_key.get_secret_value(),
)

def _get_full_path(self, path):
if path.startswith("/"):
path = path[1:]

return f"s3://{self.connection_info.bucket.get_secret_value()}/{path}"
26 changes: 25 additions & 1 deletion ibis-server/app/model/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from duckdb import DuckDBPyConnection, HTTPException

from app.model import S3FileConnectionInfo
from app.model import MinioFileConnectionInfo, S3FileConnectionInfo


def init_duckdb_s3(
Expand All @@ -20,3 +20,27 @@ def init_duckdb_s3(
raise Exception("Failed to create secret")
except HTTPException as e:
raise Exception("Failed to create secret", e)


def init_duckdb_minio(
connection: DuckDBPyConnection, connection_info: MinioFileConnectionInfo
):
create_secret = f"""
CREATE SECRET wren_minio (
TYPE S3,
KEY_ID '{connection_info.access_key.get_secret_value()}',
SECRET '{connection_info.secret_key.get_secret_value()}',
REGION 'ap-northeast-1'
)
"""
try:
result = connection.execute(create_secret).fetchone()
if result is None or not result[0]:
raise Exception("Failed to create secret")
connection.execute(
"SET s3_endpoint=?", [connection_info.endpoint.get_secret_value()]
)
connection.execute("SET s3_url_style='path'")
connection.execute("SET s3_use_ssl=?", [connection_info.ssl_enabled])
except HTTPException as e:
raise Exception("Failed to create secret", e)
Loading