Skip to content

Commit 44943ec

Browse files
tabVersionlmatz
andauthored
fix: display error message on the frontend (risingwavelabs#8638)
Signed-off-by: tabVersion <[email protected]> Co-authored-by: lmatz <[email protected]>
1 parent 1dee824 commit 44943ec

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,18 @@ public void validate(
5656
String tableName = tableProperties.get(TABLE_NAME_PROP);
5757
Set<String> jdbcColumns = new HashSet<>();
5858
Set<String> jdbcPk = new HashSet<>();
59+
Set<String> jdbcTableNames = new HashSet<>();
5960

6061
try (Connection conn = DriverManager.getConnection(jdbcUrl);
62+
ResultSet tableNamesResultSet =
63+
conn.getMetaData().getTables(null, null, "%", null);
6164
ResultSet columnResultSet =
6265
conn.getMetaData().getColumns(null, null, tableName, null);
6366
ResultSet pkResultSet =
6467
conn.getMetaData().getPrimaryKeys(null, null, tableName); ) {
68+
while (tableNamesResultSet.next()) {
69+
jdbcTableNames.add(tableNamesResultSet.getString("TABLE_NAME"));
70+
}
6571
while (columnResultSet.next()) {
6672
jdbcColumns.add(columnResultSet.getString("COLUMN_NAME"));
6773
}
@@ -72,6 +78,12 @@ public void validate(
7278
throw Status.INTERNAL.withCause(e).asRuntimeException();
7379
}
7480

81+
if (!jdbcTableNames.contains(tableName)) {
82+
throw Status.INVALID_ARGUMENT
83+
.withDescription("table not found: " + tableName)
84+
.asRuntimeException();
85+
}
86+
7587
// Check that all columns in tableSchema exist in the JDBC table.
7688
for (String sinkColumn : tableSchema.getColumnNames()) {
7789
if (!jdbcColumns.contains(sinkColumn)) {

src/connector/src/sink/remote.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
125125
})?;
126126
let host_addr = HostAddr::try_from(&address).map_err(SinkError::from)?;
127127
let client = ConnectorClient::new(host_addr).await.map_err(|err| {
128-
SinkError::Remote(format!(
128+
let msg = format!(
129129
"failed to connect to connector endpoint `{}`: {:?}",
130130
&address, err
131-
))
131+
);
132+
tracing::warn!(msg);
133+
SinkError::Remote(msg)
132134
})?;
133135

134136
let table_schema = Some(TableSchema {
@@ -151,7 +153,21 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
151153
)
152154
.await
153155
.map_err(SinkError::from)?;
154-
let _ = response.next().await.unwrap();
156+
response.next().await.unwrap().map_err(|e| {
157+
let msg = format!(
158+
"failed to start sink stream for connector `{}` with error code: {}, message: {:?}",
159+
&config.connector_type,
160+
e.code(),
161+
e.message()
162+
);
163+
tracing::warn!(msg);
164+
SinkError::Remote(msg)
165+
})?;
166+
tracing::info!(
167+
"{:?} sink stream started with properties: {:?}",
168+
&config.connector_type,
169+
&config.properties
170+
);
155171

156172
Ok(RemoteSink {
157173
connector_type: config.connector_type,

0 commit comments

Comments
 (0)