Skip to content

Commit 384a41f

Browse files
committed
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-schema-event
2 parents d68aca5 + aba3232 commit 384a41f

File tree

12 files changed

+143
-58
lines changed

12 files changed

+143
-58
lines changed

ci/scripts/common.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,16 @@ get_latest_kafka_download_url() {
115115
local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz"
116116
echo "$download_url"
117117
}
118+
119+
get_latest_cassandra_version() {
120+
local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+")
121+
# Sort the version numbers and get the latest one
122+
local latest_version=$(echo "$versions" | sort -V | tail -n1)
123+
echo "$latest_version"
124+
}
125+
126+
get_latest_cassandra_download_url() {
127+
local latest_version=$(get_latest_cassandra_version)
128+
local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz"
129+
echo "$download_url"
130+
}

ci/scripts/e2e-cassandra-sink-test.sh

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,37 +36,21 @@ risedev ci-start ci-sink-test
3636
# Wait cassandra server to start
3737
sleep 40
3838

39-
echo "--- create cassandra table"
40-
curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
41-
tar xfvz apache-cassandra-4.1.3-bin.tar.gz
39+
echo "--- install cassandra"
40+
wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
41+
tar xfvz cassandra_latest.tar.gz
42+
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
43+
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
4244
# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver.
43-
rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip
44-
rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip
45+
rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip
46+
rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip
4547
apt-get install -y libev4 libev-dev
4648
pip3 install --break-system-packages cassandra-driver
47-
48-
cd apache-cassandra-4.1.3/bin
4949
export CQLSH_HOST=cassandra-server
5050
export CQLSH_PORT=9042
51-
./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
52-
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"
5351

5452
echo "--- testing sinks"
55-
cd ../../
5653
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
57-
sleep 1
58-
cd apache-cassandra-4.1.3/bin
59-
./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
60-
61-
if cat ./query_result.csv | awk -F "," '{
62-
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
63-
echo "Cassandra sink check passed"
64-
else
65-
echo "The output is not as expected."
66-
echo "output:"
67-
cat ./query_result.csv
68-
exit 1
69-
fi
7054

7155
echo "--- Kill cluster"
7256
cd ../../

ci/workflows/main-cron.yml

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -901,25 +901,24 @@ steps:
901901
timeout_in_minutes: 10
902902
retry: *auto-retry
903903

904-
# FIXME(xxhZs): https://github.com/risingwavelabs/risingwave/issues/17855
905-
# - label: "end-to-end cassandra sink test"
906-
# key: "e2e-cassandra-sink-tests"
907-
# command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
908-
# if: |
909-
# !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
910-
# || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
911-
# || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
912-
# depends_on:
913-
# - "build"
914-
# - "build-other"
915-
# plugins:
916-
# - docker-compose#v5.1.0:
917-
# run: sink-test-env
918-
# config: ci/docker-compose.yml
919-
# mount-buildkite-agent: true
920-
# - ./ci/plugins/upload-failure-logs
921-
# timeout_in_minutes: 10
922-
# retry: *auto-retry
904+
- label: "end-to-end cassandra sink test"
905+
key: "e2e-cassandra-sink-tests"
906+
command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
907+
if: |
908+
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
909+
|| build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
910+
|| build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
911+
depends_on:
912+
- "build"
913+
- "build-other"
914+
plugins:
915+
- docker-compose#v5.1.0:
916+
run: sink-test-env
917+
config: ci/docker-compose.yml
918+
mount-buildkite-agent: true
919+
- ./ci/plugins/upload-failure-logs
920+
timeout_in_minutes: 10
921+
retry: *auto-retry
923922

924923
- label: "end-to-end clickhouse sink test"
925924
key: "e2e-clickhouse-sink-tests"

e2e_test/error_ui/simple/main.slt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ db error: ERROR: Failed to run the query
2727
Caused by these errors (recent errors listed first):
2828
1: failed to check UDF signature
2929
2: failed to send requests to UDF service
30-
3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} }
30+
3: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} }
3131
4: transport error
32-
5: error trying to connect
33-
6: tcp connect error
34-
7: deadline has elapsed
32+
5: connection error
33+
6: connection reset
3534

3635

3736
statement error

e2e_test/sink/cassandra_sink.slt

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
system ok
2+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"
3+
4+
system ok
5+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);"
6+
17
statement ok
28
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
39

410
statement ok
5-
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
11+
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);
612

713
statement ok
814
CREATE SINK s6
915
FROM
10-
mv6 WITH (
16+
t6 WITH (
1117
connector = 'cassandra',
1218
type = 'append-only',
1319
force_append_only='true',
@@ -17,17 +23,53 @@ FROM
1723
cassandra.datacenter = 'datacenter1',
1824
);
1925

26+
statement ok
27+
CREATE SINK s7
28+
FROM
29+
t7 WITH (
30+
connector = 'cassandra',
31+
type = 'append-only',
32+
force_append_only='true',
33+
cassandra.url = 'cassandra-server:9042',
34+
cassandra.keyspace = 'demo',
35+
cassandra.table = 'Test_uppercase',
36+
cassandra.datacenter = 'datacenter1',
37+
);
38+
2039
statement ok
2140
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
2241

42+
statement ok
43+
INSERT INTO t7 VALUES (1, 1, 1);
44+
2345
statement ok
2446
FLUSH;
2547

2648
statement ok
2749
DROP SINK s6;
2850

2951
statement ok
30-
DROP MATERIALIZED VIEW mv6;
52+
DROP TABLE t6;
53+
54+
statement ok
55+
DROP SINK s7;
3156

3257
statement ok
33-
DROP TABLE t6;
58+
DROP TABLE t7;
59+
60+
system ok
61+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
62+
63+
system ok
64+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
65+
66+
system ok
67+
cat ./query_result.csv
68+
----
69+
1,1,1,1.1,1.2,test,2013-01-01,2013-01-01 01:01:01.000+0000,False
70+
71+
72+
system ok
73+
cat ./query_result2.csv
74+
----
75+
1,1,1

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public CassandraConfig(
5757
@JsonProperty(value = "type") String type) {
5858
this.url = url;
5959
this.keyspace = keyspace;
60-
this.table = table;
60+
this.table = CassandraUtil.convertCQLIdentifiers(table);
6161
this.datacenter = datacenter;
6262
this.type = type;
6363
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,15 @@ public void drop() {
193193

194194
private String createInsertStatement(String tableName, TableSchema tableSchema) {
195195
String[] columnNames = tableSchema.getColumnNames();
196-
String columnNamesString = String.join(", ", columnNames);
196+
String columnNamesString =
197+
Arrays.stream(columnNames)
198+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName))
199+
.collect(Collectors.joining(", "));
197200
String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
201+
System.out.println(
202+
String.format(
203+
"INSERT INTO %s (%s) VALUES (%s)",
204+
tableName, columnNamesString, placeholdersString));
198205
return String.format(
199206
"INSERT INTO %s (%s) VALUES (%s)",
200207
tableName, columnNamesString, placeholdersString);
@@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema)
204211
List<String> primaryKeys = tableSchema.getPrimaryKeys();
205212
String setClause = // cassandra does not allow SET on primary keys
206213
nonKeyColumns.stream()
207-
.map(columnName -> columnName + " = ?")
214+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
208215
.collect(Collectors.joining(", "));
209216
String whereClause =
210217
primaryKeys.stream()
211-
.map(columnName -> columnName + " = ?")
218+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
212219
.collect(Collectors.joining(" AND "));
213220
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
214221
}
@@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
217224
List<String> primaryKeys = tableSchema.getPrimaryKeys();
218225
String whereClause =
219226
primaryKeys.stream()
220-
.map(columnName -> columnName + " = ?")
227+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
221228
.collect(Collectors.joining(" AND "));
222229
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
223230
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,8 @@ public static Object convertRow(Object value, TypeName typeName) {
167167
.asRuntimeException();
168168
}
169169
}
170+
171+
public static String convertCQLIdentifiers(String identifier) {
172+
return "\"" + identifier + "\"";
173+
}
170174
}

src/connector/src/sink/iceberg/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ pub struct IcebergConfig {
123123
#[serde(rename = "s3.secret.key")]
124124
pub secret_key: String,
125125

126+
#[serde(
127+
rename = "s3.path.style.access",
128+
default,
129+
deserialize_with = "deserialize_bool_from_string"
130+
)]
131+
pub path_style_access: bool,
132+
126133
#[serde(
127134
rename = "primary_key",
128135
default,
@@ -270,6 +277,10 @@ impl IcebergConfig {
270277
"iceberg.table.io.secret_access_key".to_string(),
271278
self.secret_key.clone().to_string(),
272279
);
280+
iceberg_configs.insert(
281+
"iceberg.table.io.enable_virtual_host_style".to_string(),
282+
(!self.path_style_access).to_string(),
283+
);
273284

274285
let (bucket, root) = {
275286
let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
@@ -409,7 +420,10 @@ impl IcebergConfig {
409420
"s3.secret-access-key".to_string(),
410421
self.secret_key.clone().to_string(),
411422
);
412-
423+
java_catalog_configs.insert(
424+
"s3.path-style-access".to_string(),
425+
self.path_style_access.to_string(),
426+
);
413427
if matches!(self.catalog_type.as_deref(), Some("glue")) {
414428
java_catalog_configs.insert(
415429
"client.credentials-provider".to_string(),
@@ -1286,6 +1300,7 @@ mod test {
12861300
("s3.endpoint", "http://127.0.0.1:9301"),
12871301
("s3.access.key", "hummockadmin"),
12881302
("s3.secret.key", "hummockadmin"),
1303+
("s3.path.style.access", "true"),
12891304
("s3.region", "us-east-1"),
12901305
("catalog.type", "jdbc"),
12911306
("catalog.name", "demo"),
@@ -1315,6 +1330,7 @@ mod test {
13151330
endpoint: Some("http://127.0.0.1:9301".to_string()),
13161331
access_key: "hummockadmin".to_string(),
13171332
secret_key: "hummockadmin".to_string(),
1333+
path_style_access: true,
13181334
primary_key: Some(vec!["v1".to_string()]),
13191335
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
13201336
.into_iter()
@@ -1350,6 +1366,7 @@ mod test {
13501366
("s3.access.key", "hummockadmin"),
13511367
("s3.secret.key", "hummockadmin"),
13521368
("s3.region", "us-east-1"),
1369+
("s3.path.style.access", "true"),
13531370
("catalog.name", "demo"),
13541371
("catalog.type", "storage"),
13551372
("warehouse.path", "s3://icebergdata/demo"),
@@ -1374,6 +1391,7 @@ mod test {
13741391
("s3.access.key", "hummockadmin"),
13751392
("s3.secret.key", "hummockadmin"),
13761393
("s3.region", "us-east-1"),
1394+
("s3.path.style.access", "true"),
13771395
("catalog.name", "demo"),
13781396
("catalog.type", "rest"),
13791397
("catalog.uri", "http://192.168.167.4:8181"),
@@ -1399,6 +1417,7 @@ mod test {
13991417
("s3.access.key", "hummockadmin"),
14001418
("s3.secret.key", "hummockadmin"),
14011419
("s3.region", "us-east-1"),
1420+
("s3.path.style.access", "true"),
14021421
("catalog.name", "demo"),
14031422
("catalog.type", "jdbc"),
14041423
("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
@@ -1426,6 +1445,7 @@ mod test {
14261445
("s3.access.key", "hummockadmin"),
14271446
("s3.secret.key", "hummockadmin"),
14281447
("s3.region", "us-east-1"),
1448+
("s3.path.style.access", "true"),
14291449
("catalog.name", "demo"),
14301450
("catalog.type", "hive"),
14311451
("catalog.uri", "thrift://localhost:9083"),

src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::marker::PhantomData;
1616

17+
use anyhow::anyhow;
1718
use async_trait::async_trait;
1819
use chrono::{DateTime, Utc};
1920
use futures::stream::{self, BoxStream};
@@ -51,14 +52,22 @@ impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
5152

5253
async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
5354
let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
55+
let prefix = self.prefix.as_deref().unwrap_or("/");
5456

55-
Ok(vec![empty_split])
57+
match self.op.list(prefix).await {
58+
Ok(_) => return Ok(vec![empty_split]),
59+
Err(e) => {
60+
return Err(anyhow!(e)
61+
.context("fail to create source, please check your config.")
62+
.into())
63+
}
64+
}
5665
}
5766
}
5867

5968
impl<Src: OpendalSource> OpendalEnumerator<Src> {
6069
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
61-
let prefix = self.prefix.as_deref().unwrap_or("");
70+
let prefix = self.prefix.as_deref().unwrap_or("/");
6271

6372
let object_lister = self
6473
.op

src/connector/with_options_sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ IcebergConfig:
272272
- name: s3.secret.key
273273
field_type: String
274274
required: true
275+
- name: s3.path.style.access
276+
field_type: bool
277+
required: false
278+
default: Default::default
275279
- name: primary_key
276280
field_type: Vec<String>
277281
required: false

src/stream/src/executor/source/list_executor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ impl<S: StateStore> FsListExecutor<S> {
9999
.collect::<Vec<_>>();
100100

101101
let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect();
102+
if res.is_empty() {
103+
tracing::warn!("No items were listed from source.");
104+
return Ok(StreamChunk::default());
105+
}
102106
Ok(StreamChunk::from_rows(
103107
&res,
104108
&[DataType::Varchar, DataType::Timestamptz, DataType::Int64],

0 commit comments

Comments
 (0)