Skip to content

Commit 8183b41

Browse files
authored
feat(meta): introduce sink validation in meta (risingwavelabs#8417)
1 parent fbcd407 commit 8183b41

File tree

12 files changed

+296
-121
lines changed

12 files changed

+296
-121
lines changed

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
4343
ConnectorServiceProto.ValidateSinkResponse.newBuilder()
4444
.setError(
4545
ConnectorServiceProto.ValidationError.newBuilder()
46-
.setErrorMessage(e.toString())
46+
.setErrorMessage(e.getMessage())
4747
.build())
4848
.build());
4949
responseObserver.onCompleted();
5050
}
51+
52+
responseObserver.onNext(ConnectorServiceProto.ValidateSinkResponse.newBuilder().build());
53+
responseObserver.onCompleted();
5154
}
5255
}

src/common/src/util/addr.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
use std::net::SocketAddr;
1616
use std::str::FromStr;
1717

18+
use anyhow::anyhow;
1819
use risingwave_pb::common::HostAddress as ProstHostAddress;
1920

20-
use crate::error::{internal_error, Result};
21-
2221
/// General host address and port.
2322
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2423
pub struct HostAddr {
@@ -41,33 +40,33 @@ impl From<SocketAddr> for HostAddr {
4140
}
4241

4342
impl TryFrom<&str> for HostAddr {
44-
type Error = crate::error::RwError;
43+
type Error = anyhow::Error;
4544

46-
fn try_from(s: &str) -> Result<Self> {
47-
let addr = url::Url::parse(&format!("http://{}", s))
48-
.map_err(|e| internal_error(format!("{}: {}", e, s)))?;
45+
fn try_from(s: &str) -> Result<Self, Self::Error> {
46+
let addr =
47+
url::Url::parse(&format!("http://{}", s)).map_err(|e| anyhow!("{}: {}", e, s))?;
4948
Ok(HostAddr {
5049
host: addr
5150
.host()
52-
.ok_or_else(|| internal_error("invalid host"))?
51+
.ok_or_else(|| anyhow!("invalid host"))?
5352
.to_string(),
54-
port: addr.port().ok_or_else(|| internal_error("invalid port"))?,
53+
port: addr.port().ok_or_else(|| anyhow!("invalid port"))?,
5554
})
5655
}
5756
}
5857

5958
impl TryFrom<&String> for HostAddr {
60-
type Error = crate::error::RwError;
59+
type Error = anyhow::Error;
6160

62-
fn try_from(s: &String) -> Result<Self> {
61+
fn try_from(s: &String) -> Result<Self, Self::Error> {
6362
Self::try_from(s.as_str())
6463
}
6564
}
6665

6766
impl FromStr for HostAddr {
68-
type Err = crate::error::RwError;
67+
type Err = anyhow::Error;
6968

70-
fn from_str(s: &str) -> Result<Self> {
69+
fn from_str(s: &str) -> Result<Self, Self::Err> {
7170
Self::try_from(s)
7271
}
7372
}

src/connector/src/sink/catalog/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ pub mod desc;
1717
use std::collections::HashMap;
1818

1919
use itertools::Itertools;
20-
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
20+
use risingwave_common::catalog::{
21+
ColumnCatalog, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
22+
};
2123
use risingwave_common::util::sort_util::ColumnOrder;
2224
use risingwave_pb::catalog::{Sink as ProstSink, SinkType as ProstSinkType};
2325

@@ -165,6 +167,19 @@ impl SinkCatalog {
165167
sink_type: self.sink_type.to_proto() as i32,
166168
}
167169
}
170+
171+
pub fn schema(&self) -> Schema {
172+
let fields = self
173+
.columns
174+
.iter()
175+
.map(|column| Field::from(column.column_desc.clone()))
176+
.collect_vec();
177+
Schema { fields }
178+
}
179+
180+
pub fn pk_indices(&self) -> Vec<usize> {
181+
self.pk.iter().map(|k| k.column_index).collect_vec()
182+
}
168183
}
169184

170185
impl From<ProstSink> for SinkCatalog {

src/connector/src/sink/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize};
3131
use thiserror::Error;
3232
pub use tracing;
3333

34-
use self::catalog::SinkType;
34+
use self::catalog::{SinkCatalog, SinkType};
3535
use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK};
3636
use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
3737
use crate::sink::redis::{RedisConfig, RedisSink};
@@ -160,6 +160,37 @@ impl SinkImpl {
160160
SinkConfig::BlackHole => SinkImpl::Blackhole,
161161
})
162162
}
163+
164+
pub async fn validate(
165+
cfg: SinkConfig,
166+
sink_catalog: SinkCatalog,
167+
connector_rpc_endpoint: Option<String>,
168+
) -> Result<()> {
169+
match cfg {
170+
SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()),
171+
SinkConfig::Kafka(cfg) => {
172+
// We simply call `KafkaSink::new` here to validate a Kafka sink.
173+
if sink_catalog.sink_type.is_append_only() {
174+
KafkaSink::<true>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
175+
.await
176+
.map(|_| ())
177+
} else {
178+
KafkaSink::<false>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
179+
.await
180+
.map(|_| ())
181+
}
182+
}
183+
SinkConfig::Remote(cfg) => {
184+
if sink_catalog.sink_type.is_append_only() {
185+
RemoteSink::<true>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
186+
} else {
187+
RemoteSink::<false>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
188+
}
189+
}
190+
SinkConfig::Console(_) => Ok(()),
191+
SinkConfig::BlackHole => Ok(()),
192+
}
193+
}
163194
}
164195

165196
macro_rules! impl_sink {
@@ -222,7 +253,7 @@ pub enum SinkError {
222253

223254
impl From<RpcError> for SinkError {
224255
fn from(value: RpcError) -> Self {
225-
SinkError::Remote(format!("{:?}", value))
256+
SinkError::Remote(format!("{}", value))
226257
}
227258
}
228259

0 commit comments

Comments
 (0)