Skip to content

Commit 8c95702

Browse files
Eric FutabVersion
andauthored
fix(sink): remove unimplemented (risingwavelabs#8622)
Signed-off-by: tabVersion <[email protected]> Co-authored-by: tabVersion <[email protected]>
1 parent 7cd7c9d commit 8c95702

File tree

3 files changed

+102
-140
lines changed

3 files changed

+102
-140
lines changed

src/connector/src/sink/kafka.rs

Lines changed: 3 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ use rdkafka::message::ToBytes;
2323
use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer};
2424
use rdkafka::types::RDKafkaErrorCode;
2525
use rdkafka::ClientConfig;
26-
use risingwave_common::array::{ArrayError, ArrayResult, Op, RowRef, StreamChunk};
26+
use risingwave_common::array::{Op, RowRef, StreamChunk};
2727
use risingwave_common::catalog::{Field, Schema};
2828
use risingwave_common::row::Row;
29-
use risingwave_common::types::to_text::ToText;
30-
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl};
31-
use risingwave_common::util::iter_util::ZipEqFast;
3229
use serde_derive::Deserialize;
3330
use serde_json::{json, Map, Value};
3431
use tracing::warn;
@@ -37,7 +34,7 @@ use super::{
3734
Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
3835
};
3936
use crate::common::KafkaCommon;
40-
use crate::sink::Result;
37+
use crate::sink::{datum_to_json_object, record_to_json, Result};
4138
use crate::{deserialize_bool_from_string, deserialize_duration_from_string};
4239

4340
pub const KAFKA_SINK: &str = "kafka";
@@ -386,96 +383,6 @@ impl<const APPEND_ONLY: bool> Debug for KafkaSink<APPEND_ONLY> {
386383
}
387384
}
388385

389-
fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value> {
390-
let scalar_ref = match datum {
391-
None => return Ok(Value::Null),
392-
Some(datum) => datum,
393-
};
394-
395-
let data_type = field.data_type();
396-
397-
tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
398-
399-
let value = match (data_type, scalar_ref) {
400-
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
401-
json!(v)
402-
}
403-
(DataType::Int16, ScalarRefImpl::Int16(v)) => {
404-
json!(v)
405-
}
406-
(DataType::Int32, ScalarRefImpl::Int32(v)) => {
407-
json!(v)
408-
}
409-
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
410-
json!(v)
411-
}
412-
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
413-
json!(f32::from(v))
414-
}
415-
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
416-
json!(f64::from(v))
417-
}
418-
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
419-
json!(v)
420-
}
421-
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
422-
// fixme
423-
json!(v.to_text())
424-
}
425-
(
426-
dt @ DataType::Date
427-
| dt @ DataType::Time
428-
| dt @ DataType::Timestamp
429-
| dt @ DataType::Timestamptz
430-
| dt @ DataType::Interval
431-
| dt @ DataType::Bytea,
432-
scalar,
433-
) => {
434-
json!(scalar.to_text_with_type(&dt))
435-
}
436-
(DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => {
437-
let mut vec = Vec::with_capacity(list_ref.values_ref().len());
438-
let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
439-
for sub_datum_ref in list_ref.values_ref() {
440-
let value = datum_to_json_object(&inner_field, sub_datum_ref)?;
441-
vec.push(value);
442-
}
443-
json!(vec)
444-
}
445-
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
446-
let mut map = Map::with_capacity(st.fields.len());
447-
for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast(
448-
st.fields
449-
.iter()
450-
.zip_eq_fast(st.field_names.iter())
451-
.map(|(dt, name)| Field::with_name(dt.clone(), name)),
452-
) {
453-
let value = datum_to_json_object(&sub_field, sub_datum_ref)?;
454-
map.insert(sub_field.name.clone(), value);
455-
}
456-
json!(map)
457-
}
458-
_ => {
459-
return Err(ArrayError::internal(
460-
"datum_to_json_object: unsupported data type".to_string(),
461-
));
462-
}
463-
};
464-
465-
Ok(value)
466-
}
467-
468-
fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result<Map<String, Value>> {
469-
let mut mappings = Map::with_capacity(schema.len());
470-
for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) {
471-
let key = field.name.clone();
472-
let value = datum_to_json_object(field, datum_ref)
473-
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
474-
mappings.insert(key, value);
475-
}
476-
Ok(mappings)
477-
}
478-
479386
fn pk_to_json(
480387
row: RowRef<'_>,
481388
schema: &[Field],
@@ -611,6 +518,7 @@ impl KafkaTransactionConductor {
611518
mod test {
612519
use maplit::hashmap;
613520
use risingwave_common::test_prelude::StreamChunkTestExt;
521+
use risingwave_common::types::DataType;
614522

615523
use super::*;
616524

src/connector/src/sink/mod.rs

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@ use std::collections::HashMap;
2323
use anyhow::anyhow;
2424
use async_trait::async_trait;
2525
use enum_as_inner::EnumAsInner;
26-
use risingwave_common::array::StreamChunk;
27-
use risingwave_common::catalog::Schema;
26+
use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk};
27+
use risingwave_common::catalog::{Field, Schema};
2828
use risingwave_common::error::{ErrorCode, RwError};
29+
use risingwave_common::row::Row;
30+
use risingwave_common::types::to_text::ToText;
31+
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl};
32+
use risingwave_common::util::iter_util::ZipEqFast;
2933
use risingwave_rpc_client::error::RpcError;
3034
use serde::{Deserialize, Serialize};
35+
use serde_json::{json, Map, Value};
3136
use thiserror::Error;
3237
pub use tracing;
3338

@@ -37,7 +42,6 @@ use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
3742
use crate::sink::redis::{RedisConfig, RedisSink};
3843
use crate::sink::remote::{RemoteConfig, RemoteSink};
3944
use crate::ConnectorParams;
40-
4145
pub const SINK_TYPE_OPTION: &str = "type";
4246
pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
4347
pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
@@ -257,3 +261,93 @@ impl From<SinkError> for RwError {
257261
ErrorCode::SinkError(Box::new(e)).into()
258262
}
259263
}
264+
265+
pub fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result<Map<String, Value>> {
266+
let mut mappings = Map::with_capacity(schema.len());
267+
for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) {
268+
let key = field.name.clone();
269+
let value = datum_to_json_object(field, datum_ref)
270+
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
271+
mappings.insert(key, value);
272+
}
273+
Ok(mappings)
274+
}
275+
276+
fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value> {
277+
let scalar_ref = match datum {
278+
None => return Ok(Value::Null),
279+
Some(datum) => datum,
280+
};
281+
282+
let data_type = field.data_type();
283+
284+
tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
285+
286+
let value = match (data_type, scalar_ref) {
287+
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
288+
json!(v)
289+
}
290+
(DataType::Int16, ScalarRefImpl::Int16(v)) => {
291+
json!(v)
292+
}
293+
(DataType::Int32, ScalarRefImpl::Int32(v)) => {
294+
json!(v)
295+
}
296+
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
297+
json!(v)
298+
}
299+
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
300+
json!(f32::from(v))
301+
}
302+
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
303+
json!(f64::from(v))
304+
}
305+
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
306+
json!(v)
307+
}
308+
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
309+
// fixme
310+
json!(v.to_text())
311+
}
312+
(
313+
dt @ DataType::Date
314+
| dt @ DataType::Time
315+
| dt @ DataType::Timestamp
316+
| dt @ DataType::Timestamptz
317+
| dt @ DataType::Interval
318+
| dt @ DataType::Bytea,
319+
scalar,
320+
) => {
321+
json!(scalar.to_text_with_type(&dt))
322+
}
323+
(DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => {
324+
let mut vec = Vec::with_capacity(list_ref.values_ref().len());
325+
let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
326+
for sub_datum_ref in list_ref.values_ref() {
327+
let value = datum_to_json_object(&inner_field, sub_datum_ref)?;
328+
vec.push(value);
329+
}
330+
json!(vec)
331+
}
332+
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
333+
let mut map = Map::with_capacity(st.fields.len());
334+
for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast(
335+
st.fields
336+
.iter()
337+
.zip_eq_fast(st.field_names.iter())
338+
.map(|(dt, name)| Field::with_name(dt.clone(), name)),
339+
) {
340+
let value = datum_to_json_object(&sub_field, sub_datum_ref)?;
341+
map.insert(sub_field.name.clone(), value);
342+
}
343+
json!(map)
344+
}
345+
_ => {
346+
return Err(ArrayError::internal(
347+
"datum_to_json_object: unsupported data type".to_string(),
348+
));
349+
}
350+
};
351+
352+
Ok(value)
353+
}

src/connector/src/sink/remote.rs

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@ use risingwave_common::array::StreamChunk;
2121
#[cfg(test)]
2222
use risingwave_common::catalog::Field;
2323
use risingwave_common::catalog::Schema;
24-
use risingwave_common::row::Row;
2524
#[cfg(test)]
2625
use risingwave_common::types::DataType;
27-
use risingwave_common::types::{DatumRef, ScalarRefImpl};
2826
use risingwave_common::util::addr::HostAddr;
29-
use risingwave_common::util::iter_util::ZipEqFast;
3027
use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp;
3128
use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload};
3229
use risingwave_pb::connector_service::sink_stream_request::{
@@ -35,14 +32,12 @@ use risingwave_pb::connector_service::sink_stream_request::{
3532
use risingwave_pb::connector_service::table_schema::Column;
3633
use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest, TableSchema};
3734
use risingwave_rpc_client::ConnectorClient;
38-
use serde_json::Value;
39-
use serde_json::Value::Number;
4035
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
4136
use tokio_stream::StreamExt;
4237
use tonic::{Status, Streaming};
4338

4439
use super::catalog::SinkCatalog;
45-
use crate::sink::{Result, Sink, SinkError};
40+
use crate::sink::{record_to_json, Result, Sink, SinkError};
4641
use crate::ConnectorParams;
4742

4843
pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "file", "iceberg"];
@@ -254,13 +249,7 @@ impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
254249
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
255250
let mut row_ops = vec![];
256251
for (op, row_ref) in chunk.rows() {
257-
let mut map = serde_json::Map::new();
258-
row_ref
259-
.iter()
260-
.zip_eq_fast(self.schema.fields.iter())
261-
.for_each(|(v, f)| {
262-
map.insert(f.name.clone(), parse_datum(v));
263-
});
252+
let map = record_to_json(row_ref, &self.schema.fields)?;
264253
let row_op = RowOp {
265254
op_type: op.to_protobuf() as i32,
266255
line: serde_json::to_string(&map)
@@ -319,35 +308,6 @@ impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
319308
}
320309
}
321310

322-
fn parse_datum(datum: DatumRef<'_>) -> Value {
323-
match datum {
324-
None => Value::Null,
325-
Some(ScalarRefImpl::Int32(v)) => Value::from(v),
326-
Some(ScalarRefImpl::Int64(v)) => Value::from(v),
327-
Some(ScalarRefImpl::Float32(v)) => Value::from(v.into_inner()),
328-
Some(ScalarRefImpl::Float64(v)) => Value::from(v.into_inner()),
329-
Some(ScalarRefImpl::Decimal(v)) => Number(v.to_string().parse().unwrap()),
330-
Some(ScalarRefImpl::Utf8(v)) => Value::from(v),
331-
Some(ScalarRefImpl::Bool(v)) => Value::from(v),
332-
Some(ScalarRefImpl::NaiveDate(v)) => Value::from(v.to_string()),
333-
Some(ScalarRefImpl::NaiveTime(v)) => Value::from(v.to_string()),
334-
Some(ScalarRefImpl::Interval(v)) => Value::from(v.to_string()),
335-
Some(ScalarRefImpl::Struct(v)) => Value::from(
336-
v.fields_ref()
337-
.iter()
338-
.map(|v| parse_datum(*v))
339-
.collect::<Vec<_>>(),
340-
),
341-
Some(ScalarRefImpl::List(v)) => Value::from(
342-
v.values_ref()
343-
.iter()
344-
.map(|v| parse_datum(*v))
345-
.collect::<Vec<_>>(),
346-
),
347-
_ => unimplemented!(),
348-
}
349-
}
350-
351311
#[cfg(test)]
352312
mod test {
353313
use std::sync::Arc;

0 commit comments

Comments
 (0)