Skip to content

Commit ace4c4b

Browse files
authored
fix(sink): add cassandra batch size and fix bigquery array null (#15516)
1 parent d1b4612 commit ace4c4b

File tree

10 files changed

+106
-42
lines changed

10 files changed

+106
-42
lines changed

integration_tests/big-query-sink/create_sink.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ FROM
2323
-- bigquery.dataset= '${dataset_id}',
2424
-- bigquery.table= '${table_id}',
2525
-- access_key = '${aws_access_key}',
26-
-- secret_access = '${aws_secret_access}',
26+
-- secret_key = '${aws_secret_key}',
2727
-- region = '${aws_region}',
2828
-- force_append_only='true',
2929
-- );

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java

+32
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public class CassandraConfig extends CommonSinkConfig {
4242
@JsonProperty(value = "cassandra.password")
4343
private String password;
4444

45+
@JsonProperty(value = "cassandra.max_batch_rows")
46+
private Integer maxBatchRows = 512;
47+
48+
@JsonProperty(value = "cassandra.request_timeout_ms")
49+
private Integer requestTimeoutMs = 2000;
50+
4551
@JsonCreator
4652
public CassandraConfig(
4753
@JsonProperty(value = "cassandra.url") String url,
@@ -93,4 +99,30 @@ public CassandraConfig withPassword(String password) {
9399
this.password = password;
94100
return this;
95101
}
102+
103+
public Integer getMaxBatchRows() {
104+
return maxBatchRows;
105+
}
106+
107+
public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
108+
if (maxBatchRows > 65536 || maxBatchRows < 1) {
109+
throw new IllegalArgumentException(
110+
"Cassandra sink option: maxBatchRows must be <= 65535 and >= 1");
111+
}
112+
this.maxBatchRows = maxBatchRows;
113+
return this;
114+
}
115+
116+
public Integer getRequestTimeoutMs() {
117+
return requestTimeoutMs;
118+
}
119+
120+
public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) {
121+
if (requestTimeoutMs < 1) {
122+
throw new IllegalArgumentException(
123+
"Cassandra sink option: requestTimeoutMs must be >= 1");
124+
}
125+
this.requestTimeoutMs = requestTimeoutMs;
126+
return this;
127+
}
96128
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.datastax.oss.driver.api.core.CqlSession;
2020
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
21+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
22+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2123
import com.datastax.oss.driver.api.core.cql.*;
2224
import com.risingwave.connector.api.TableSchema;
2325
import com.risingwave.connector.api.sink.SinkRow;
@@ -34,7 +36,6 @@
3436

3537
public class CassandraSink extends SinkWriterBase {
3638
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
37-
private static final Integer MAX_BATCH_SIZE = 1024 * 16;
3839

3940
private final CqlSession session;
4041
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
@@ -51,9 +52,16 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
5152
throw new IllegalArgumentException(
5253
"Invalid cassandraURL: expected `host:port`, got " + url);
5354
}
55+
56+
DriverConfigLoader loader =
57+
DriverConfigLoader.programmaticBuilder()
58+
.withInt(DefaultDriverOption.REQUEST_TIMEOUT, config.getRequestTimeoutMs())
59+
.build();
60+
5461
// check connection
5562
CqlSessionBuilder sessionBuilder =
5663
CqlSession.builder()
64+
.withConfigLoader(loader)
5765
.addContactPoint(
5866
new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
5967
.withKeyspace(config.getKeyspace())
@@ -163,7 +171,7 @@ private void write_upsert(Iterator<SinkRow> rows) {
163171
}
164172

165173
private void tryCommit() {
166-
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
174+
if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) {
167175
sync();
168176
}
169177
}

src/connector/src/common.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl AwsAuthProps {
100100
),
101101
))
102102
} else {
103-
bail!("Both \"access_key\" and \"secret_access\" are required.")
103+
bail!("Both \"access_key\" and \"secret_key\" are required.")
104104
}
105105
}
106106

src/connector/src/sink/big_query.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@ use risingwave_common::catalog::Schema;
2828
use risingwave_common::types::DataType;
2929
use serde_derive::Deserialize;
3030
use serde_json::Value;
31-
use serde_with::serde_as;
31+
use serde_with::{serde_as, DisplayFromStr};
3232
use url::Url;
3333
use with_options::WithOptions;
3434
use yup_oauth2::ServiceAccountKey;
3535

36-
use super::encoder::{
37-
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
38-
TimestamptzHandlingMode,
39-
};
36+
use super::encoder::{JsonEncoder, RowEncoder};
4037
use super::writer::LogSinkerOf;
4138
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
4239
use crate::aws_utils::load_file_descriptor_from_s3;
@@ -47,8 +44,8 @@ use crate::sink::{
4744
};
4845

4946
pub const BIGQUERY_SINK: &str = "bigquery";
50-
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;
5147

48+
#[serde_as]
5249
#[derive(Deserialize, Debug, Clone, WithOptions)]
5350
pub struct BigQueryCommon {
5451
#[serde(rename = "bigquery.local.path")]
@@ -61,6 +58,13 @@ pub struct BigQueryCommon {
6158
pub dataset: String,
6259
#[serde(rename = "bigquery.table")]
6360
pub table: String,
61+
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
62+
#[serde_as(as = "DisplayFromStr")]
63+
pub max_batch_rows: usize,
64+
}
65+
66+
fn default_max_batch_rows() -> usize {
67+
1024
6468
}
6569

6670
impl BigQueryCommon {
@@ -312,14 +316,7 @@ impl BigQuerySinkWriter {
312316
client,
313317
is_append_only,
314318
insert_request: TableDataInsertAllRequest::new(),
315-
row_encoder: JsonEncoder::new(
316-
schema,
317-
None,
318-
DateHandlingMode::String,
319-
TimestampHandlingMode::String,
320-
TimestamptzHandlingMode::UtcString,
321-
TimeHandlingMode::Milli,
322-
),
319+
row_encoder: JsonEncoder::new_with_bigquery(schema, None),
323320
})
324321
}
325322

@@ -339,7 +336,11 @@ impl BigQuerySinkWriter {
339336
self.insert_request
340337
.add_rows(insert_vec)
341338
.map_err(|e| SinkError::BigQuery(e.into()))?;
342-
if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) {
339+
if self
340+
.insert_request
341+
.len()
342+
.ge(&self.config.common.max_batch_rows)
343+
{
343344
self.insert_data().await?;
344345
}
345346
Ok(())
@@ -349,7 +350,8 @@ impl BigQuerySinkWriter {
349350
if !self.insert_request.is_empty() {
350351
let insert_request =
351352
mem::replace(&mut self.insert_request, TableDataInsertAllRequest::new());
352-
self.client
353+
let request = self
354+
.client
353355
.tabledata()
354356
.insert_all(
355357
&self.config.common.project,
@@ -359,6 +361,12 @@ impl BigQuerySinkWriter {
359361
)
360362
.await
361363
.map_err(|e| SinkError::BigQuery(e.into()))?;
364+
if let Some(error) = request.insert_errors {
365+
return Err(SinkError::BigQuery(anyhow::anyhow!(
366+
"Insert error: {:?}",
367+
error
368+
)));
369+
}
362370
}
363371
Ok(())
364372
}

src/connector/src/sink/doris.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use super::doris_starrocks_connector::{
3939
POOL_IDLE_TIMEOUT,
4040
};
4141
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
42-
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
42+
use crate::sink::encoder::{JsonEncoder, RowEncoder};
4343
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
4444
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
4545

@@ -294,12 +294,7 @@ impl DorisSinkWriter {
294294
inserter_inner_builder: doris_insert_builder,
295295
is_append_only,
296296
client: None,
297-
row_encoder: JsonEncoder::new_with_doris(
298-
schema,
299-
None,
300-
TimestampHandlingMode::String,
301-
decimal_map,
302-
),
297+
row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
303298
})
304299
}
305300

src/connector/src/sink/encoder/json.rs

+28-8
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,14 @@ impl JsonEncoder {
8383
pub fn new_with_doris(
8484
schema: Schema,
8585
col_indices: Option<Vec<usize>>,
86-
timestamp_handling_mode: TimestampHandlingMode,
8786
map: HashMap<String, (u8, u8)>,
8887
) -> Self {
8988
Self {
9089
schema,
9190
col_indices,
9291
time_handling_mode: TimeHandlingMode::Milli,
9392
date_handling_mode: DateHandlingMode::String,
94-
timestamp_handling_mode,
93+
timestamp_handling_mode: TimestampHandlingMode::String,
9594
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
9695
custom_json_type: CustomJsonType::Doris(map),
9796
kafka_connect: None,
@@ -101,21 +100,33 @@ impl JsonEncoder {
101100
pub fn new_with_starrocks(
102101
schema: Schema,
103102
col_indices: Option<Vec<usize>>,
104-
timestamp_handling_mode: TimestampHandlingMode,
105103
map: HashMap<String, (u8, u8)>,
106104
) -> Self {
107105
Self {
108106
schema,
109107
col_indices,
110108
time_handling_mode: TimeHandlingMode::Milli,
111109
date_handling_mode: DateHandlingMode::String,
112-
timestamp_handling_mode,
110+
timestamp_handling_mode: TimestampHandlingMode::String,
113111
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
114112
custom_json_type: CustomJsonType::StarRocks(map),
115113
kafka_connect: None,
116114
}
117115
}
118116

117+
pub fn new_with_bigquery(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
118+
Self {
119+
schema,
120+
col_indices,
121+
time_handling_mode: TimeHandlingMode::Milli,
122+
date_handling_mode: DateHandlingMode::String,
123+
timestamp_handling_mode: TimestampHandlingMode::String,
124+
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
125+
custom_json_type: CustomJsonType::BigQuery,
126+
kafka_connect: None,
127+
}
128+
}
129+
119130
pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
120131
Self {
121132
kafka_connect: Some(Arc::new(kafka_connect)),
@@ -192,7 +203,16 @@ fn datum_to_json_object(
192203
custom_json_type: &CustomJsonType,
193204
) -> ArrayResult<Value> {
194205
let scalar_ref = match datum {
195-
None => return Ok(Value::Null),
206+
None => {
207+
if let CustomJsonType::BigQuery = custom_json_type
208+
&& matches!(field.data_type(), DataType::List(_))
209+
{
210+
// Bigquery need to convert null of array to empty array https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
211+
return Ok(Value::Array(vec![]));
212+
} else {
213+
return Ok(Value::Null);
214+
}
215+
}
196216
Some(datum) => datum,
197217
};
198218

@@ -239,7 +259,7 @@ fn datum_to_json_object(
239259
}
240260
json!(v_string)
241261
}
242-
CustomJsonType::Es | CustomJsonType::None => {
262+
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
243263
json!(v.to_text())
244264
}
245265
},
@@ -291,7 +311,7 @@ fn datum_to_json_object(
291311
}
292312
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
293313
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
294-
CustomJsonType::Doris(_) | CustomJsonType::None => {
314+
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
295315
json!(jsonb_ref.to_string())
296316
}
297317
},
@@ -342,7 +362,7 @@ fn datum_to_json_object(
342362
"starrocks can't support struct".to_string(),
343363
));
344364
}
345-
CustomJsonType::Es | CustomJsonType::None => {
365+
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
346366
let mut map = Map::with_capacity(st.len());
347367
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
348368
st.iter()

src/connector/src/sink/encoder/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ pub enum CustomJsonType {
144144
Es,
145145
// starrocks' need jsonb is struct
146146
StarRocks(HashMap<String, (u8, u8)>),
147+
// bigquery need null array -> []
148+
BigQuery,
147149
None,
148150
}
149151

src/connector/src/sink/starrocks.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use with_options::WithOptions;
3535
use super::doris_starrocks_connector::{
3636
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN,
3737
};
38-
use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
38+
use super::encoder::{JsonEncoder, RowEncoder};
3939
use super::writer::LogSinkerOf;
4040
use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
4141
use crate::sink::writer::SinkWriterExt;
@@ -367,12 +367,7 @@ impl StarrocksSinkWriter {
367367
inserter_innet_builder: starrocks_insert_builder,
368368
is_append_only,
369369
client: None,
370-
row_encoder: JsonEncoder::new_with_starrocks(
371-
schema,
372-
None,
373-
TimestampHandlingMode::String,
374-
decimal_map,
375-
),
370+
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
376371
})
377372
}
378373

src/connector/with_options_sink.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ BigQueryConfig:
1717
- name: bigquery.table
1818
field_type: String
1919
required: true
20+
- name: bigquery.max_batch_rows
21+
field_type: usize
22+
required: false
23+
default: '1024'
2024
- name: region
2125
field_type: String
2226
required: false

0 commit comments

Comments
 (0)