6
6
import com .clickhouse .data .ClickHouseFormat ;
7
7
8
8
import java .io .IOException ;
9
+ import java .io .InputStream ;
9
10
import java .io .OutputStream ;
11
+ import java .io .Reader ;
12
+ import java .math .BigDecimal ;
13
+ import java .math .BigInteger ;
10
14
import java .time .LocalDate ;
11
15
import java .time .LocalDateTime ;
12
16
import java .time .ZonedDateTime ;
17
+ import java .util .Arrays ;
13
18
import java .util .List ;
14
19
15
20
21
26
* <p>
22
27
* Experimental API
23
28
*/
24
- public class RowBinaryFormatWriter {
29
+ public class RowBinaryFormatWriter implements ClickHouseBinaryFormatWriter {
25
30
26
31
private final OutputStream out ;
27
32
@@ -31,6 +36,10 @@ public class RowBinaryFormatWriter {
31
36
32
37
private final boolean defaultSupport ;
33
38
39
+ private int rowCount = 0 ;
40
+
41
+ private boolean rowStarted = false ; // indicates if at least one value was written to a row
42
+
34
43
public RowBinaryFormatWriter (OutputStream out , TableSchema tableSchema , ClickHouseFormat format ) {
35
44
if (format != ClickHouseFormat .RowBinary && format != ClickHouseFormat .RowBinaryWithDefaults ) {
36
45
throw new IllegalArgumentException ("Only RowBinary and RowBinaryWithDefaults are supported" );
@@ -42,96 +51,233 @@ public RowBinaryFormatWriter(OutputStream out, TableSchema tableSchema, ClickHou
42
51
this .defaultSupport = format == ClickHouseFormat .RowBinaryWithDefaults ;
43
52
}
44
53
54
+ @ Override
55
+ public OutputStream getOutputStream () {
56
+ return out ;
57
+ }
58
+
59
+ @ Override
60
+ public int getRowCount () {
61
+ return rowCount ;
62
+ }
63
+
64
+ @ Override
65
+ public ClickHouseFormat getFormat () {
66
+ return defaultSupport ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
67
+ }
68
+
69
+ @ Override
70
+ public void clearRow () {
71
+ Arrays .fill (row , null );
72
+ rowStarted = false ;
73
+ }
74
+
75
+ @ Override
45
76
public void setValue (String column , Object value ) {
46
77
setValue (tableSchema .nameToColumnIndex (column ), value );
47
78
}
48
79
80
+ @ Override
49
81
public void setValue (int colIndex , Object value ) {
50
82
row [colIndex - 1 ] = value ;
83
+ if (!rowStarted ) {
84
+ rowStarted = true ;
85
+ }
51
86
}
52
87
88
+ @ Override
53
89
public void commitRow () throws IOException {
54
- List <ClickHouseColumn > columnList = tableSchema .getColumns ();
55
- for (int i = 0 ; i < row .length ; i ++) {
56
- ClickHouseColumn column = columnList .get (i );
57
- // here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
58
- if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT )
59
- continue ;
60
- if (RowBinaryFormatSerializer .writeValuePreamble (out , defaultSupport , column , row [i ])) {
61
- SerializerUtils .serializeData (out , row [i ], column );
90
+ if (rowStarted ) {
91
+ List <ClickHouseColumn > columnList = tableSchema .getColumns ();
92
+ for (int i = 0 ; i < row .length ; i ++) {
93
+ ClickHouseColumn column = columnList .get (i );
94
+ // here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
95
+ if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT )
96
+ continue ;
97
+ if (RowBinaryFormatSerializer .writeValuePreamble (out , defaultSupport , column , row [i ])) {
98
+ SerializerUtils .serializeData (out , row [i ], column );
99
+ }
62
100
}
101
+ clearRow ();
102
+ rowCount ++;
63
103
}
64
104
}
65
105
106
+ @ Override
66
107
public void setByte (String column , byte value ) {
67
108
setValue (column , value );
68
109
}
69
110
111
+ @ Override
70
112
public void setByte (int colIndex , byte value ) {
71
113
setValue (colIndex , value );
72
114
}
73
115
116
+ @ Override
74
117
public void setShort (String column , short value ) {
75
118
setValue (column , value );
76
119
}
77
120
121
+ @ Override
78
122
public void setShort (int colIndex , short value ) {
79
123
setValue (colIndex , value );
80
124
}
81
125
126
+ @ Override
82
127
public void setInteger (String column , int value ) {
83
128
setValue (column , value );
84
129
}
85
130
131
+ @ Override
86
132
public void setInteger (int colIndex , int value ) {
87
133
setValue (colIndex , value );
88
134
}
89
135
136
+ @ Override
90
137
public void setLong (String column , long value ) {
91
138
setValue (column , value );
92
139
}
93
140
141
+ @ Override
94
142
public void setLong (int colIndex , long value ) {
95
143
setValue (colIndex , value );
96
144
}
97
145
146
+ @ Override
147
+ public void setBigInteger (int colIndex , BigInteger value ) {
148
+ setValue (colIndex , value );
149
+ }
150
+
151
+ @ Override
152
+ public void setBigInteger (String column , BigInteger value ) {
153
+ setValue (column , value );
154
+ }
155
+
156
+ @ Override
157
+ public void setFloat (int colIndex , float value ) {
158
+ setValue (colIndex , value );
159
+ }
160
+
161
+ @ Override
162
+ public void setFloat (String column , float value ) {
163
+ setValue (column , value );
164
+ }
165
+
166
+ @ Override
167
+ public void setDouble (int colIndex , double value ) {
168
+ setValue (colIndex , value );
169
+ }
170
+
171
+ @ Override
172
+ public void setDouble (String column , double value ) {
173
+ setValue (column , value );
174
+ }
175
+
176
+ @ Override
177
+ public void setBigDecimal (int colIndex , BigDecimal value ) {
178
+ setValue (colIndex , value );
179
+ }
180
+
181
+ @ Override
182
+ public void setBigDecimal (String column , BigDecimal value ) {
183
+ setValue (column , value );
184
+ }
185
+
186
+ @ Override
187
+ public void setBoolean (int colIndex , boolean value ) {
188
+ setValue (colIndex , value );
189
+ }
190
+
191
+ @ Override
192
+ public void setBoolean (String column , boolean value ) {
193
+ setValue (column , value );
194
+ }
195
+
196
+ @ Override
98
197
public void setString (String column , String value ) {
99
198
setValue (column , value );
100
199
}
101
200
201
+ @ Override
102
202
public void setString (int colIndex , String value ) {
103
203
setValue (colIndex , value );
104
204
}
105
205
206
+ @ Override
106
207
public void setDate (String column , LocalDate value ) {
107
208
setValue (column , value );
108
209
}
109
210
211
+ @ Override
110
212
public void setDate (int colIndex , LocalDate value ) {
111
213
setValue (colIndex , value );
112
214
}
113
215
216
+ @ Override
114
217
public void setDateTime (String column , LocalDateTime value ) {
115
218
setValue (column , value );
116
219
}
117
220
221
+ @ Override
118
222
public void setDateTime (int colIndex , LocalDateTime value ) {
119
223
setValue (colIndex , value );
120
224
}
121
225
226
+ @ Override
122
227
public void setDateTime (String column , ZonedDateTime value ) {
123
228
setValue (column , value );
124
229
}
125
230
231
+ @ Override
126
232
public void setDateTime (int colIndex , ZonedDateTime value ) {
127
233
setValue (colIndex , value );
128
234
}
129
235
236
+ @ Override
130
237
public void setList (String column , List <?> value ) {
131
238
setValue (column , value );
132
239
}
133
240
241
+ @ Override
134
242
public void setList (int colIndex , List <?> value ) {
135
243
setValue (colIndex , value );
136
244
}
245
+
246
+ @ Override
247
+ public void setInputStream (int colIndex , InputStream in , long len ) {
248
+ setValue (colIndex , new InputStreamHolder (in , len ));
249
+ }
250
+
251
+ @ Override
252
+ public void setInputStream (String column , InputStream in , long len ) {
253
+ setValue (column , new InputStreamHolder (in , len ));
254
+ }
255
+
256
+ @ Override
257
+ public void setReader (int colIndex , Reader reader , long len ) {
258
+ setValue (colIndex , new ReaderHolder (reader , len ));
259
+ }
260
+
261
+ @ Override
262
+ public void setReader (String column , Reader reader , long len ) {
263
+ setValue (column , new ReaderHolder (reader , len ));
264
+ }
265
+
266
+ private static class InputStreamHolder {
267
+ final InputStream stream ;
268
+ final long length ;
269
+ InputStreamHolder (InputStream stream , long length ) {
270
+ this .stream = stream ;
271
+ this .length = length ;
272
+ }
273
+ }
274
+
275
+ private static class ReaderHolder {
276
+ final Reader read ;
277
+ final long length ;
278
+ ReaderHolder (Reader reader , long length ) {
279
+ this .read = reader ;
280
+ this .length = length ;
281
+ }
282
+ }
137
283
}
0 commit comments