Skip to content

Commit 16c0c62

Browse files
authored
Merge branch 'main' into dylan/add_internal_tables_to_pg_class
2 parents 495db60 + 9b89bb0 commit 16c0c62

File tree

20 files changed

+306
-85
lines changed

20 files changed

+306
-85
lines changed

.github/workflows/intergration_tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ jobs:
3939
- schema-registry
4040
- mysql-cdc
4141
- postgres-cdc
42-
#- mysql-sink
42+
- mysql-sink
4343
- postgres-sink
4444
- iceberg-sink
4545
format: ["json", "protobuf"]

e2e_test/batch/basic/join.slt.part

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,25 @@ select * from t1 join t2 using(v1) join t3 using(v2);
3232
----
3333
2 1 3 3
3434

35+
statement ok
36+
set batch_parallelism = 1;
37+
38+
query IIIIII
39+
select * from t1 join t2 using(v1) join t3 using(v2);
40+
----
41+
2 1 3 3
42+
43+
statement ok
44+
set batch_parallelism = 1000;
45+
46+
query IIIIII
47+
select * from t1 join t2 using(v1) join t3 using(v2);
48+
----
49+
2 1 3 3
50+
51+
statement ok
52+
set batch_parallelism = 0;
53+
3554
statement ok
3655
create index i1 on t1(v1) include(v2);
3756

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
CREATE TABLE target_count (
2-
target_id VARCHAR(128),
2+
target_id VARCHAR(128) primary key,
33
target_count BIGINT
44
);

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
102102
.collect(Collectors.toList()));
103103
}
104104

105+
/** @deprecated pk here is from Risingwave, it may not match the pk in the database */
106+
@Deprecated
105107
public List<String> getPrimaryKeys() {
106108
return primaryKeys;
107109
}

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java

Lines changed: 102 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.risingwave.proto.Data;
2121
import io.grpc.Status;
2222
import java.sql.*;
23+
import java.util.ArrayList;
2324
import java.util.Iterator;
25+
import java.util.List;
2426
import java.util.stream.Collectors;
2527
import java.util.stream.IntStream;
2628
import org.slf4j.Logger;
@@ -30,10 +32,13 @@ public class JDBCSink extends SinkBase {
3032
public static final String INSERT_TEMPLATE = "INSERT INTO %s (%s) VALUES (%s)";
3133
private static final String DELETE_TEMPLATE = "DELETE FROM %s WHERE %s";
3234
private static final String UPDATE_TEMPLATE = "UPDATE %s SET %s WHERE %s";
35+
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";
3336

3437
private final String tableName;
3538
private final Connection conn;
3639
private final String jdbcUrl;
40+
private final List<String> pkColumnNames;
41+
public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME";
3742

3843
private String updateDeleteConditionBuffer;
3944
private Object[] updateDeleteValueBuffer;
@@ -48,16 +53,38 @@ public JDBCSink(String tableName, String jdbcUrl, TableSchema tableSchema) {
4853
try {
4954
this.conn = DriverManager.getConnection(jdbcUrl);
5055
this.conn.setAutoCommit(false);
56+
this.pkColumnNames = getPkColumnNames(conn, tableName);
5157
} catch (SQLException e) {
52-
throw Status.INTERNAL.withCause(e).asRuntimeException();
58+
throw Status.INTERNAL
59+
.withDescription(
60+
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
61+
.asRuntimeException();
62+
}
63+
}
64+
65+
private static List<String> getPkColumnNames(Connection conn, String tableName) {
66+
List<String> pkColumnNames = new ArrayList<>();
67+
try {
68+
var pks = conn.getMetaData().getPrimaryKeys(null, null, tableName);
69+
while (pks.next()) {
70+
pkColumnNames.add(pks.getString(JDBC_COLUMN_NAME_KEY));
71+
}
72+
} catch (SQLException e) {
73+
throw Status.INTERNAL
74+
.withDescription(
75+
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
76+
.asRuntimeException();
5377
}
78+
LOG.info("detected pk {}", pkColumnNames);
79+
return pkColumnNames;
5480
}
5581

5682
public JDBCSink(Connection conn, TableSchema tableSchema, String tableName) {
5783
super(tableSchema);
5884
this.tableName = tableName;
5985
this.jdbcUrl = null;
6086
this.conn = conn;
87+
this.pkColumnNames = getPkColumnNames(conn, tableName);
6188
}
6289

6390
private PreparedStatement prepareStatement(SinkRow row) {
@@ -79,35 +106,75 @@ private PreparedStatement prepareStatement(SinkRow row) {
79106
}
80107
return stmt;
81108
} catch (SQLException e) {
82-
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
109+
throw io.grpc.Status.INTERNAL
110+
.withDescription(
111+
String.format(
112+
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
113+
.asRuntimeException();
83114
}
84115
case DELETE:
85-
String deleteCondition =
86-
getTableSchema().getPrimaryKeys().stream()
87-
.map(key -> key + " = ?")
88-
.collect(Collectors.joining(" AND "));
116+
String deleteCondition;
117+
if (this.pkColumnNames.isEmpty()) {
118+
deleteCondition =
119+
IntStream.range(0, getTableSchema().getNumColumns())
120+
.mapToObj(
121+
index ->
122+
getTableSchema().getColumnNames()[index]
123+
+ " = ?")
124+
.collect(Collectors.joining(" AND "));
125+
} else {
126+
deleteCondition =
127+
this.pkColumnNames.stream()
128+
.map(key -> key + " = ?")
129+
.collect(Collectors.joining(" AND "));
130+
}
89131
String deleteStmt = String.format(DELETE_TEMPLATE, tableName, deleteCondition);
90132
try {
91133
int placeholderIdx = 1;
92134
PreparedStatement stmt =
93135
conn.prepareStatement(deleteStmt, Statement.RETURN_GENERATED_KEYS);
94-
for (String primaryKey : getTableSchema().getPrimaryKeys()) {
136+
for (String primaryKey : this.pkColumnNames) {
95137
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
96138
stmt.setObject(placeholderIdx++, fromRow);
97139
}
98140
return stmt;
99141
} catch (SQLException e) {
100-
throw Status.INTERNAL.withCause(e).asRuntimeException();
142+
throw Status.INTERNAL
143+
.withDescription(
144+
String.format(
145+
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
146+
.asRuntimeException();
101147
}
102148
case UPDATE_DELETE:
103-
updateDeleteConditionBuffer =
104-
getTableSchema().getPrimaryKeys().stream()
105-
.map(key -> key + " = ?")
106-
.collect(Collectors.joining(" AND "));
107-
updateDeleteValueBuffer =
108-
getTableSchema().getPrimaryKeys().stream()
109-
.map(key -> getTableSchema().getFromRow(key, row))
110-
.toArray();
149+
if (this.pkColumnNames.isEmpty()) {
150+
updateDeleteConditionBuffer =
151+
IntStream.range(0, getTableSchema().getNumColumns())
152+
.mapToObj(
153+
index ->
154+
getTableSchema().getColumnNames()[index]
155+
+ " = ?")
156+
.collect(Collectors.joining(" AND "));
157+
updateDeleteValueBuffer =
158+
IntStream.range(0, getTableSchema().getNumColumns())
159+
.mapToObj(
160+
index ->
161+
getTableSchema()
162+
.getFromRow(
163+
getTableSchema()
164+
.getColumnNames()[
165+
index],
166+
row))
167+
.toArray();
168+
} else {
169+
updateDeleteConditionBuffer =
170+
this.pkColumnNames.stream()
171+
.map(key -> key + " = ?")
172+
.collect(Collectors.joining(" AND "));
173+
updateDeleteValueBuffer =
174+
this.pkColumnNames.stream()
175+
.map(key -> getTableSchema().getFromRow(key, row))
176+
.toArray();
177+
}
111178
LOG.debug(
112179
"update delete condition: {} on values {}",
113180
updateDeleteConditionBuffer,
@@ -144,7 +211,11 @@ private PreparedStatement prepareStatement(SinkRow row) {
144211
updateDeleteValueBuffer = null;
145212
return stmt;
146213
} catch (SQLException e) {
147-
throw Status.INTERNAL.withCause(e).asRuntimeException();
214+
throw Status.INTERNAL
215+
.withDescription(
216+
String.format(
217+
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
218+
.asRuntimeException();
148219
}
149220
default:
150221
throw Status.INVALID_ARGUMENT
@@ -163,10 +234,14 @@ public void write(Iterator<SinkRow> rows) {
163234
}
164235
if (stmt != null) {
165236
try {
166-
LOG.debug("Executing statement: " + stmt);
237+
LOG.debug("Executing statement: {}", stmt);
167238
stmt.executeUpdate();
168239
} catch (SQLException e) {
169-
throw Status.INTERNAL.withCause(e).asRuntimeException();
240+
throw Status.INTERNAL
241+
.withDescription(
242+
String.format(
243+
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
244+
.asRuntimeException();
170245
}
171246
} else {
172247
throw Status.INTERNAL
@@ -187,7 +262,10 @@ public void sync() {
187262
try {
188263
conn.commit();
189264
} catch (SQLException e) {
190-
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
265+
throw io.grpc.Status.INTERNAL
266+
.withDescription(
267+
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
268+
.asRuntimeException();
191269
}
192270
}
193271

@@ -196,7 +274,10 @@ public void drop() {
196274
try {
197275
conn.close();
198276
} catch (SQLException e) {
199-
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
277+
throw io.grpc.Status.INTERNAL
278+
.withDescription(
279+
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
280+
.asRuntimeException();
200281
}
201282
}
202283

src/common/src/session_config/mod.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod search_path;
1717
mod transaction_isolation_level;
1818
mod visibility_mode;
1919

20+
use std::num::NonZeroU64;
2021
use std::ops::Deref;
2122

2223
use chrono_tz::Tz;
@@ -33,7 +34,7 @@ use crate::util::epoch::Epoch;
3334

3435
// This is a hack, &'static str is not allowed as a const generics argument.
3536
// TODO: refine this using the adt_const_params feature.
36-
const CONFIG_KEYS: [&str; 20] = [
37+
const CONFIG_KEYS: [&str; 21] = [
3738
"RW_IMPLICIT_FLUSH",
3839
"CREATE_COMPACTION_GROUP_FOR_MV",
3940
"QUERY_MODE",
@@ -54,6 +55,7 @@ const CONFIG_KEYS: [&str; 20] = [
5455
"RW_FORCE_TWO_PHASE_AGG",
5556
"RW_ENABLE_SHARE_PLAN",
5657
"INTERVALSTYLE",
58+
"BATCH_PARALLELISM",
5759
];
5860

5961
// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
@@ -78,6 +80,7 @@ const ENABLE_TWO_PHASE_AGG: usize = 16;
7880
const FORCE_TWO_PHASE_AGG: usize = 17;
7981
const RW_ENABLE_SHARE_PLAN: usize = 18;
8082
const INTERVAL_STYLE: usize = 19;
83+
const BATCH_PARALLELISM: usize = 20;
8184

8285
trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
8386
fn entry_name() -> &'static str;
@@ -278,6 +281,7 @@ type EnableTwoPhaseAgg = ConfigBool<ENABLE_TWO_PHASE_AGG, true>;
278281
type ForceTwoPhaseAgg = ConfigBool<FORCE_TWO_PHASE_AGG, false>;
279282
type EnableSharePlan = ConfigBool<RW_ENABLE_SHARE_PLAN, true>;
280283
type IntervalStyle = ConfigString<INTERVAL_STYLE>;
284+
type BatchParallelism = ConfigU64<BATCH_PARALLELISM, 0>;
281285

282286
#[derive(Derivative)]
283287
#[derivative(Default)]
@@ -354,6 +358,8 @@ pub struct ConfigMap {
354358

355359
/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
356360
interval_style: IntervalStyle,
361+
362+
batch_parallelism: BatchParallelism,
357363
}
358364

359365
impl ConfigMap {
@@ -410,6 +416,8 @@ impl ConfigMap {
410416
self.enable_share_plan = val.as_slice().try_into()?;
411417
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
412418
self.interval_style = val.as_slice().try_into()?;
419+
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
420+
self.batch_parallelism = val.as_slice().try_into()?;
413421
} else {
414422
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
415423
}
@@ -458,6 +466,8 @@ impl ConfigMap {
458466
Ok(self.enable_share_plan.to_string())
459467
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
460468
Ok(self.interval_style.to_string())
469+
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
470+
Ok(self.batch_parallelism.to_string())
461471
} else {
462472
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
463473
}
@@ -560,6 +570,11 @@ impl ConfigMap {
560570
setting : self.interval_style.to_string(),
561571
description : String::from("It is typically set by an application upon connection to the server.")
562572
},
573+
VariableInfo{
574+
name : BatchParallelism::entry_name().to_lowercase(),
575+
setting : self.batch_parallelism.to_string(),
576+
description: String::from("Sets the parallelism for batch. If 0, use default value.")
577+
},
563578
]
564579
}
565580

@@ -648,4 +663,11 @@ impl ConfigMap {
648663
pub fn get_interval_style(&self) -> &str {
649664
&self.interval_style
650665
}
666+
667+
pub fn get_batch_parallelism(&self) -> Option<NonZeroU64> {
668+
if self.batch_parallelism.0 != 0 {
669+
return Some(NonZeroU64::new(self.batch_parallelism.0).unwrap());
670+
}
671+
None
672+
}
651673
}

src/connector/src/source/base.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ pub type DataType = risingwave_common::types::DataType;
363363
pub struct Column {
364364
pub name: String,
365365
pub data_type: DataType,
366+
pub is_visible: bool,
366367
}
367368

368369
/// Split id resides in every source message, use `Arc` to avoid copying.

0 commit comments

Comments
 (0)