Skip to content

Commit 80f4ab9

Browse files
committed
feat(clickhouse): implement proper type serialization
1 parent 1c51969 commit 80f4ab9

20 files changed

+557
-324
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: "3.4"
22
services:
33
clickhouse:
4-
image: yandex/clickhouse-server:22-alpine
4+
image: clickhouse/clickhouse-server:22-alpine
55
ports:
66
- 8123:8123
77
- 9000:9000

ibis/backends/clickhouse/__init__.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111
import ibis.config
1212
import ibis.expr.schema as sch
1313
from ibis.backends.base.sql import BaseSQLBackend
14-
from ibis.backends.clickhouse.client import (
15-
ClickhouseDataType,
16-
ClickhouseTable,
17-
fully_qualified_re,
18-
)
14+
from ibis.backends.clickhouse.client import ClickhouseTable, fully_qualified_re
1915
from ibis.backends.clickhouse.compiler import ClickhouseCompiler
16+
from ibis.backends.clickhouse.datatypes import parse, serialize
2017
from ibis.config import options
2118

2219
_default_compression: str | bool
@@ -109,12 +106,12 @@ def current_database(self):
109106
return self.con.connection.database
110107

111108
def list_databases(self, like=None):
112-
data, schema = self.raw_sql('SELECT name FROM system.databases')
109+
data, _ = self.raw_sql('SELECT name FROM system.databases')
113110
databases = list(data[0])
114111
return self._filter_with_like(databases, like)
115112

116113
def list_tables(self, like=None, database=None):
117-
data, schema = self.raw_sql('SHOW TABLES')
114+
data, _ = self.raw_sql('SHOW TABLES')
118115
databases = list(data[0])
119116
return self._filter_with_like(databases, like)
120117

@@ -152,13 +149,7 @@ def raw_sql(
152149
'name': name,
153150
'data': df.to_dict('records'),
154151
'structure': list(
155-
zip(
156-
schema.names,
157-
[
158-
str(ClickhouseDataType.from_ibis(t))
159-
for t in schema.types
160-
],
161-
)
152+
zip(schema.names, map(serialize, schema.types))
162153
),
163154
}
164155
)
@@ -216,9 +207,8 @@ def get_schema(
216207
(column_names, types, *_), *_ = self.raw_sql(
217208
f"DESCRIBE {qualified_name}"
218209
)
219-
return sch.Schema.from_tuples(
220-
zip(column_names, map(ClickhouseDataType.parse, types))
221-
)
210+
211+
return sch.Schema.from_tuples(zip(column_names, map(parse, types)))
222212

223213
def set_options(self, options):
224214
self.con.set_options(options)
@@ -238,7 +228,7 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
238228
)
239229
[plan] = json.loads(raw_plans)
240230
fields = [
241-
(field["Name"], ClickhouseDataType.parse(field["Type"]))
231+
(field["Name"], parse(field["Type"]))
242232
for field in plan["Plan"]["Header"]
243233
]
244234
return sch.Schema.from_tuples(fields)

ibis/backends/clickhouse/client.py

Lines changed: 12 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -4,142 +4,22 @@
44
import numpy as np
55
import pandas as pd
66

7-
import ibis.common.exceptions as com
87
import ibis.expr.datatypes as dt
98
import ibis.expr.types as ir
109

1110
fully_qualified_re = re.compile(r"(.*)\.(?:`(.*)`|(.*))")
12-
base_typename_re = re.compile(r"(\w+)")
13-
14-
15-
_clickhouse_dtypes = {
16-
'Null': dt.Null,
17-
'Nothing': dt.Null,
18-
'UInt8': dt.UInt8,
19-
'UInt16': dt.UInt16,
20-
'UInt32': dt.UInt32,
21-
'UInt64': dt.UInt64,
22-
'Int8': dt.Int8,
23-
'Int16': dt.Int16,
24-
'Int32': dt.Int32,
25-
'Int64': dt.Int64,
26-
'Float32': dt.Float32,
27-
'Float64': dt.Float64,
28-
'String': dt.String,
29-
'FixedString': dt.String,
30-
'Date': dt.Date,
31-
'DateTime': dt.Timestamp,
32-
'DateTime64': dt.Timestamp,
33-
'Array': dt.Array,
34-
}
35-
_ibis_dtypes = {v: k for k, v in _clickhouse_dtypes.items()}
36-
_ibis_dtypes[dt.String] = 'String'
37-
_ibis_dtypes[dt.Timestamp] = 'DateTime'
38-
39-
40-
class ClickhouseDataType:
41-
42-
__slots__ = 'typename', 'base_typename', 'nullable'
43-
44-
def __init__(self, typename, nullable=False):
45-
m = base_typename_re.match(typename)
46-
self.base_typename = m.groups()[0]
47-
if self.base_typename not in _clickhouse_dtypes:
48-
raise com.UnsupportedBackendType(typename)
49-
self.typename = self.base_typename
50-
self.nullable = nullable
51-
52-
if self.base_typename == 'Array':
53-
self.typename = typename
54-
55-
def __str__(self):
56-
if self.nullable:
57-
return f'Nullable({self.typename})'
58-
else:
59-
return self.typename
60-
61-
def __repr__(self):
62-
return f'<Clickhouse {str(self)}>'
63-
64-
@classmethod
65-
def parse(cls, spec):
66-
# TODO(kszucs): spare parsing, depends on clickhouse-driver#22
67-
if spec.startswith('Nullable'):
68-
return cls(spec[9:-1], nullable=True)
69-
else:
70-
return cls(spec)
71-
72-
def to_ibis(self):
73-
if self.base_typename != 'Array':
74-
return _clickhouse_dtypes[self.typename](nullable=self.nullable)
75-
76-
sub_type = ClickhouseDataType(
77-
self.get_subname(self.typename)
78-
).to_ibis()
79-
return dt.Array(value_type=sub_type)
80-
81-
@staticmethod
82-
def get_subname(name: str) -> str:
83-
lbracket_pos = name.find('(')
84-
rbracket_pos = name.rfind(')')
85-
86-
if lbracket_pos == -1 or rbracket_pos == -1:
87-
return ''
88-
89-
subname = name[lbracket_pos + 1 : rbracket_pos]
90-
return subname
91-
92-
@staticmethod
93-
def get_typename_from_ibis_dtype(dtype):
94-
if not isinstance(dtype, dt.Array):
95-
return _ibis_dtypes[type(dtype)]
96-
97-
return 'Array({})'.format(
98-
ClickhouseDataType.get_typename_from_ibis_dtype(dtype.value_type)
99-
)
100-
101-
@classmethod
102-
def from_ibis(cls, dtype, nullable=None):
103-
typename = ClickhouseDataType.get_typename_from_ibis_dtype(dtype)
104-
if nullable is None:
105-
nullable = dtype.nullable
106-
return cls(typename, nullable=nullable)
107-
108-
109-
@dt.dtype.register(ClickhouseDataType)
110-
def clickhouse_to_ibis_dtype(clickhouse_dtype):
111-
return clickhouse_dtype.to_ibis()
11211

11312

11413
class ClickhouseTable(ir.TableExpr):
11514
"""References a physical table in Clickhouse"""
11615

11716
@property
11817
def _qualified_name(self):
119-
return self.op().args[0]
120-
121-
@property
122-
def _unqualified_name(self):
123-
return self._match_name()[1]
18+
return self.op().name
12419

12520
@property
12621
def _client(self):
127-
return self.op().args[2]
128-
129-
def _match_name(self):
130-
m = fully_qualified_re.match(self._qualified_name)
131-
if not m:
132-
raise com.IbisError(
133-
'Cannot determine database name from {}'.format(
134-
self._qualified_name
135-
)
136-
)
137-
db, quoted, unquoted = m.groups()
138-
return db, quoted or unquoted
139-
140-
@property
141-
def _database(self):
142-
return self._match_name()[0]
22+
return self.op().source
14323

14424
def invalidate_metadata(self):
14525
self._client.invalidate_metadata(self._qualified_name)
@@ -168,10 +48,8 @@ def insert(self, obj, **kwargs):
16848
assert isinstance(obj, pd.DataFrame)
16949
assert set(schema.names) >= set(obj.columns)
17050

171-
columns = ', '.join(map(quote_identifier, obj.columns))
172-
query = 'INSERT INTO {table} ({columns}) VALUES'.format(
173-
table=self._qualified_name, columns=columns
174-
)
51+
columns = ", ".join(map(quote_identifier, obj.columns))
52+
query = f"INSERT INTO {self._qualified_name} ({columns}) VALUES"
17553

17654
# convert data columns with datetime64 pandas dtype to native date
17755
# because clickhouse-driver 0.0.10 does arithmetic operations on it
@@ -180,5 +58,11 @@ def insert(self, obj, **kwargs):
18058
if isinstance(schema[col], dt.Date):
18159
obj[col] = obj[col].dt.date
18260

183-
data = obj.to_dict('records')
184-
return self._client.con.execute(query, data, **kwargs)
61+
settings = kwargs.pop("settings", {})
62+
settings["use_numpy"] = True
63+
return self._client.con.insert_dataframe(
64+
query,
65+
obj,
66+
settings=settings,
67+
**kwargs,
68+
)

0 commit comments

Comments
 (0)