3
3
import base64
4
4
from enum import Enum , StrEnum , auto
5
5
from json import loads
6
- from urllib .parse import urlparse
7
6
8
7
import ibis
9
8
from google .oauth2 import service_account
13
12
BigQueryConnectionInfo ,
14
13
ClickHouseConnectionInfo ,
15
14
ConnectionInfo ,
16
- ConnectionUrl ,
17
15
MSSqlConnectionInfo ,
18
16
MySqlConnectionInfo ,
19
17
PostgresConnectionInfo ,
@@ -66,6 +64,8 @@ def __init__(self, dto: QueryDTO):
66
64
67
65
def get_connection (self , info : ConnectionInfo ) -> BaseBackend :
68
66
try :
67
+ if hasattr (info , "connection_url" ):
68
+ return ibis .connect (info .connection_url .get_secret_value ())
69
69
return getattr (self , f"get_{ self .name } _connection" )(info )
70
70
except KeyError :
71
71
raise NotImplementedError (f"Unsupported data source: { self } " )
@@ -85,13 +85,7 @@ def get_bigquery_connection(info: BigQueryConnectionInfo) -> BaseBackend:
85
85
)
86
86
87
87
@staticmethod
88
- def get_clickhouse_connection (
89
- info : ConnectionUrl | ClickHouseConnectionInfo ,
90
- ) -> BaseBackend :
91
- if hasattr (info , "connection_url" ):
92
- url = info .connection_url .get_secret_value ()
93
- # ibis miss port of connection url, so we need to pass it explicitly
94
- return ibis .connect (url , port = urlparse (url ).port )
88
+ def get_clickhouse_connection (info : ClickHouseConnectionInfo ) -> BaseBackend :
95
89
return ibis .clickhouse .connect (
96
90
host = info .host .get_secret_value (),
97
91
port = int (info .port .get_secret_value ()),
@@ -102,7 +96,6 @@ def get_clickhouse_connection(
102
96
103
97
@staticmethod
104
98
def get_mssql_connection (info : MSSqlConnectionInfo ) -> BaseBackend :
105
- # mssql in ibis does not support connection url
106
99
return ibis .mssql .connect (
107
100
host = info .host .get_secret_value (),
108
101
port = info .port .get_secret_value (),
@@ -113,13 +106,7 @@ def get_mssql_connection(info: MSSqlConnectionInfo) -> BaseBackend:
113
106
)
114
107
115
108
@staticmethod
116
- def get_mysql_connection (
117
- info : ConnectionUrl | MySqlConnectionInfo ,
118
- ) -> BaseBackend :
119
- if hasattr (info , "connection_url" ):
120
- url = info .connection_url .get_secret_value ()
121
- # ibis miss port of connection url, so we need to pass it explicitly
122
- return ibis .connect (url , port = urlparse (url ).port )
109
+ def get_mysql_connection (info : MySqlConnectionInfo ) -> BaseBackend :
123
110
return ibis .mysql .connect (
124
111
host = info .host .get_secret_value (),
125
112
port = int (info .port .get_secret_value ()),
@@ -129,11 +116,7 @@ def get_mysql_connection(
129
116
)
130
117
131
118
@staticmethod
132
- def get_postgres_connection (
133
- info : ConnectionUrl | PostgresConnectionInfo ,
134
- ) -> BaseBackend :
135
- if hasattr (info , "connection_url" ):
136
- return ibis .connect (info .connection_url .get_secret_value ())
119
+ def get_postgres_connection (info : PostgresConnectionInfo ) -> BaseBackend :
137
120
return ibis .postgres .connect (
138
121
host = info .host .get_secret_value (),
139
122
port = int (info .port .get_secret_value ()),
@@ -153,11 +136,7 @@ def get_snowflake_connection(info: SnowflakeConnectionInfo) -> BaseBackend:
153
136
)
154
137
155
138
@staticmethod
156
- def get_trino_connection (
157
- info : ConnectionUrl | TrinoConnectionInfo ,
158
- ) -> BaseBackend :
159
- if hasattr (info , "connection_url" ):
160
- return ibis .connect (info .connection_url .get_secret_value ())
139
+ def get_trino_connection (info : TrinoConnectionInfo ) -> BaseBackend :
161
140
return ibis .trino .connect (
162
141
host = info .host .get_secret_value (),
163
142
port = int (info .port .get_secret_value ()),
0 commit comments