Skip to content

Commit 92460c7

Browse files
authored
fix(sink): add cassandra batch size and fix bigquery array null (#15516) (#15559)
1 parent 423b0d9 commit 92460c7

File tree

10 files changed

+106
-42
lines changed

10 files changed

+106
-42
lines changed

integration_tests/big-query-sink/create_sink.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ FROM
2323
-- bigquery.dataset= '${dataset_id}',
2424
-- bigquery.table= '${table_id}',
2525
-- access_key = '${aws_access_key}',
26-
-- secret_access = '${aws_secret_access}',
26+
-- secret_key = '${aws_secret_key}',
2727
-- region = '${aws_region}',
2828
-- force_append_only='true',
2929
-- );

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java

+32
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public class CassandraConfig extends CommonSinkConfig {
4141
@JsonProperty(value = "cassandra.password")
4242
private String password;
4343

44+
@JsonProperty(value = "cassandra.max_batch_rows")
45+
private Integer maxBatchRows = 512;
46+
47+
@JsonProperty(value = "cassandra.request_timeout_ms")
48+
private Integer requestTimeoutMs = 2000;
49+
4450
@JsonCreator
4551
public CassandraConfig(
4652
@JsonProperty(value = "cassandra.url") String url,
@@ -92,4 +98,30 @@ public CassandraConfig withPassword(String password) {
9298
this.password = password;
9399
return this;
94100
}
101+
102+
public Integer getMaxBatchRows() {
103+
return maxBatchRows;
104+
}
105+
106+
public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
107+
if (maxBatchRows > 65536 || maxBatchRows < 1) {
108+
throw new IllegalArgumentException(
109+
"Cassandra sink option: maxBatchRows must be <= 65535 and >= 1");
110+
}
111+
this.maxBatchRows = maxBatchRows;
112+
return this;
113+
}
114+
115+
public Integer getRequestTimeoutMs() {
116+
return requestTimeoutMs;
117+
}
118+
119+
public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) {
120+
if (requestTimeoutMs < 1) {
121+
throw new IllegalArgumentException(
122+
"Cassandra sink option: requestTimeoutMs must be >= 1");
123+
}
124+
this.requestTimeoutMs = requestTimeoutMs;
125+
return this;
126+
}
95127
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.datastax.oss.driver.api.core.CqlSession;
2020
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
21+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
22+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2123
import com.datastax.oss.driver.api.core.cql.*;
2224
import com.risingwave.connector.api.TableSchema;
2325
import com.risingwave.connector.api.sink.SinkRow;
@@ -34,7 +36,6 @@
3436

3537
public class CassandraSink extends SinkWriterBase {
3638
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
37-
private static final Integer MAX_BATCH_SIZE = 1024 * 16;
3839

3940
private final CqlSession session;
4041
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
@@ -51,9 +52,16 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
5152
throw new IllegalArgumentException(
5253
"Invalid cassandraURL: expected `host:port`, got " + url);
5354
}
55+
56+
DriverConfigLoader loader =
57+
DriverConfigLoader.programmaticBuilder()
58+
.withInt(DefaultDriverOption.REQUEST_TIMEOUT, config.getRequestTimeoutMs())
59+
.build();
60+
5461
// check connection
5562
CqlSessionBuilder sessionBuilder =
5663
CqlSession.builder()
64+
.withConfigLoader(loader)
5765
.addContactPoint(
5866
new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
5967
.withKeyspace(config.getKeyspace())
@@ -163,7 +171,7 @@ private void write_upsert(Iterator<SinkRow> rows) {
163171
}
164172

165173
private void tryCommit() {
166-
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
174+
if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) {
167175
sync();
168176
}
169177
}

src/connector/src/common.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl AwsAuthProps {
101101
))
102102
} else {
103103
Err(anyhow!(
104-
"Both \"access_key\" and \"secret_access\" are required."
104+
"Both \"access_key\" and \"secret_key\" are required."
105105
))
106106
}
107107
}

src/connector/src/sink/big_query.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@ use risingwave_common::catalog::Schema;
2828
use risingwave_common::types::DataType;
2929
use serde_derive::Deserialize;
3030
use serde_json::Value;
31-
use serde_with::serde_as;
31+
use serde_with::{serde_as, DisplayFromStr};
3232
use url::Url;
3333
use with_options::WithOptions;
3434
use yup_oauth2::ServiceAccountKey;
3535

36-
use super::encoder::{
37-
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
38-
TimestamptzHandlingMode,
39-
};
36+
use super::encoder::{JsonEncoder, RowEncoder};
4037
use super::writer::LogSinkerOf;
4138
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
4239
use crate::aws_utils::load_file_descriptor_from_s3;
@@ -47,8 +44,8 @@ use crate::sink::{
4744
};
4845

4946
pub const BIGQUERY_SINK: &str = "bigquery";
50-
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;
5147

48+
#[serde_as]
5249
#[derive(Deserialize, Debug, Clone, WithOptions)]
5350
pub struct BigQueryCommon {
5451
#[serde(rename = "bigquery.local.path")]
@@ -61,6 +58,13 @@ pub struct BigQueryCommon {
6158
pub dataset: String,
6259
#[serde(rename = "bigquery.table")]
6360
pub table: String,
61+
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
62+
#[serde_as(as = "DisplayFromStr")]
63+
pub max_batch_rows: usize,
64+
}
65+
66+
fn default_max_batch_rows() -> usize {
67+
1024
6468
}
6569

6670
impl BigQueryCommon {
@@ -312,14 +316,7 @@ impl BigQuerySinkWriter {
312316
client,
313317
is_append_only,
314318
insert_request: TableDataInsertAllRequest::new(),
315-
row_encoder: JsonEncoder::new(
316-
schema,
317-
None,
318-
DateHandlingMode::String,
319-
TimestampHandlingMode::String,
320-
TimestamptzHandlingMode::UtcString,
321-
TimeHandlingMode::Milli,
322-
),
319+
row_encoder: JsonEncoder::new_with_bigquery(schema, None),
323320
})
324321
}
325322

@@ -339,7 +336,11 @@ impl BigQuerySinkWriter {
339336
self.insert_request
340337
.add_rows(insert_vec)
341338
.map_err(|e| SinkError::BigQuery(e.into()))?;
342-
if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) {
339+
if self
340+
.insert_request
341+
.len()
342+
.ge(&self.config.common.max_batch_rows)
343+
{
343344
self.insert_data().await?;
344345
}
345346
Ok(())
@@ -349,7 +350,8 @@ impl BigQuerySinkWriter {
349350
if !self.insert_request.is_empty() {
350351
let insert_request =
351352
mem::replace(&mut self.insert_request, TableDataInsertAllRequest::new());
352-
self.client
353+
let request = self
354+
.client
353355
.tabledata()
354356
.insert_all(
355357
&self.config.common.project,
@@ -359,6 +361,12 @@ impl BigQuerySinkWriter {
359361
)
360362
.await
361363
.map_err(|e| SinkError::BigQuery(e.into()))?;
364+
if let Some(error) = request.insert_errors {
365+
return Err(SinkError::BigQuery(anyhow::anyhow!(
366+
"Insert error: {:?}",
367+
error
368+
)));
369+
}
362370
}
363371
Ok(())
364372
}

src/connector/src/sink/doris.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use super::doris_starrocks_connector::{
3838
POOL_IDLE_TIMEOUT,
3939
};
4040
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
41-
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
41+
use crate::sink::encoder::{JsonEncoder, RowEncoder};
4242
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
4343
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
4444

@@ -293,12 +293,7 @@ impl DorisSinkWriter {
293293
inserter_inner_builder: doris_insert_builder,
294294
is_append_only,
295295
client: None,
296-
row_encoder: JsonEncoder::new_with_doris(
297-
schema,
298-
None,
299-
TimestampHandlingMode::String,
300-
decimal_map,
301-
),
296+
row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
302297
})
303298
}
304299

src/connector/src/sink/encoder/json.rs

+28-8
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,14 @@ impl JsonEncoder {
8181
pub fn new_with_doris(
8282
schema: Schema,
8383
col_indices: Option<Vec<usize>>,
84-
timestamp_handling_mode: TimestampHandlingMode,
8584
map: HashMap<String, (u8, u8)>,
8685
) -> Self {
8786
Self {
8887
schema,
8988
col_indices,
9089
time_handling_mode: TimeHandlingMode::Milli,
9190
date_handling_mode: DateHandlingMode::String,
92-
timestamp_handling_mode,
91+
timestamp_handling_mode: TimestampHandlingMode::String,
9392
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
9493
custom_json_type: CustomJsonType::Doris(map),
9594
kafka_connect: None,
@@ -99,21 +98,33 @@ impl JsonEncoder {
9998
pub fn new_with_starrocks(
10099
schema: Schema,
101100
col_indices: Option<Vec<usize>>,
102-
timestamp_handling_mode: TimestampHandlingMode,
103101
map: HashMap<String, (u8, u8)>,
104102
) -> Self {
105103
Self {
106104
schema,
107105
col_indices,
108106
time_handling_mode: TimeHandlingMode::Milli,
109107
date_handling_mode: DateHandlingMode::String,
110-
timestamp_handling_mode,
108+
timestamp_handling_mode: TimestampHandlingMode::String,
111109
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
112110
custom_json_type: CustomJsonType::StarRocks(map),
113111
kafka_connect: None,
114112
}
115113
}
116114

115+
pub fn new_with_bigquery(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
116+
Self {
117+
schema,
118+
col_indices,
119+
time_handling_mode: TimeHandlingMode::Milli,
120+
date_handling_mode: DateHandlingMode::String,
121+
timestamp_handling_mode: TimestampHandlingMode::String,
122+
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
123+
custom_json_type: CustomJsonType::BigQuery,
124+
kafka_connect: None,
125+
}
126+
}
127+
117128
pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
118129
Self {
119130
kafka_connect: Some(Arc::new(kafka_connect)),
@@ -190,7 +201,16 @@ fn datum_to_json_object(
190201
custom_json_type: &CustomJsonType,
191202
) -> ArrayResult<Value> {
192203
let scalar_ref = match datum {
193-
None => return Ok(Value::Null),
204+
None => {
205+
if let CustomJsonType::BigQuery = custom_json_type
206+
&& matches!(field.data_type(), DataType::List(_))
207+
{
208+
// Bigquery need to convert null of array to empty array https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
209+
return Ok(Value::Array(vec![]));
210+
} else {
211+
return Ok(Value::Null);
212+
}
213+
}
194214
Some(datum) => datum,
195215
};
196216

@@ -237,7 +257,7 @@ fn datum_to_json_object(
237257
}
238258
json!(v_string)
239259
}
240-
CustomJsonType::Es | CustomJsonType::None => {
260+
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
241261
json!(v.to_text())
242262
}
243263
},
@@ -289,7 +309,7 @@ fn datum_to_json_object(
289309
}
290310
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
291311
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
292-
CustomJsonType::Doris(_) | CustomJsonType::None => {
312+
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
293313
json!(jsonb_ref.to_string())
294314
}
295315
},
@@ -340,7 +360,7 @@ fn datum_to_json_object(
340360
"starrocks can't support struct".to_string(),
341361
));
342362
}
343-
CustomJsonType::Es | CustomJsonType::None => {
363+
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
344364
let mut map = Map::with_capacity(st.len());
345365
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
346366
st.iter()

src/connector/src/sink/encoder/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ pub enum CustomJsonType {
144144
Es,
145145
// starrocks' need jsonb is struct
146146
StarRocks(HashMap<String, (u8, u8)>),
147+
// bigquery need null array -> []
148+
BigQuery,
147149
None,
148150
}
149151

src/connector/src/sink/starrocks.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use with_options::WithOptions;
3434
use super::doris_starrocks_connector::{
3535
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN,
3636
};
37-
use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
37+
use super::encoder::{JsonEncoder, RowEncoder};
3838
use super::writer::LogSinkerOf;
3939
use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
4040
use crate::sink::writer::SinkWriterExt;
@@ -358,12 +358,7 @@ impl StarrocksSinkWriter {
358358
inserter_innet_builder: starrocks_insert_builder,
359359
is_append_only,
360360
client: None,
361-
row_encoder: JsonEncoder::new_with_starrocks(
362-
schema,
363-
None,
364-
TimestampHandlingMode::String,
365-
decimal_map,
366-
),
361+
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
367362
})
368363
}
369364

src/connector/with_options_sink.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ BigQueryConfig:
1717
- name: bigquery.table
1818
field_type: String
1919
required: true
20+
- name: bigquery.max_batch_rows
21+
field_type: usize
22+
required: false
23+
default: '1024'
2024
- name: region
2125
field_type: String
2226
required: false

0 commit comments

Comments
 (0)