Skip to content

Commit f7d89cc

Browse files
authored
Merge branch 'main' into lz/auto-cherrypick-pr
2 parents de55cf4 + 608e183 commit f7d89cc

File tree

4 files changed

+72
-20
lines changed

4 files changed

+72
-20
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/connector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ itertools = "0.10"
5454
maplit = "1.0.2"
5555
moka = { version = "0.10", features = ["future"] }
5656
nexmark = { version = "0.2", features = ["serde"] }
57+
num-bigint = "0.4"
5758
num-traits = "0.2"
5859
parking_lot = "0.12"
5960
prometheus = { version = "0.13", features = ["process"] }

src/connector/src/parser/avro/util.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use risingwave_pb::plan_common::ColumnDesc;
2424
use crate::parser::unified::avro::AvroParseOptions;
2525

2626
const RW_DECIMAL_MAX_PRECISION: usize = 28;
27+
const DBZ_VARIABLE_SCALE_DECIMAL_NAME: &str = "VariableScaleDecimal";
28+
const DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE: &str = "io.debezium.data";
2729

2830
pub(crate) fn avro_field_to_column_desc(
2931
name: &str,
@@ -71,14 +73,29 @@ fn avro_type_mapping(schema: &Schema) -> Result<DataType> {
7173
Schema::Boolean => DataType::Boolean,
7274
Schema::Float => DataType::Float32,
7375
Schema::Double => DataType::Float64,
74-
Schema::Decimal { .. } => DataType::Decimal,
76+
Schema::Decimal { precision, .. } => {
77+
if precision > &RW_DECIMAL_MAX_PRECISION {
78+
tracing::warn!(
79+
"RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
80+
RW_DECIMAL_MAX_PRECISION,
81+
precision
82+
);
83+
}
84+
DataType::Decimal
85+
}
7586
Schema::Date => DataType::Date,
7687
Schema::TimestampMillis => DataType::Timestamptz,
7788
Schema::TimestampMicros => DataType::Timestamptz,
7889
Schema::Duration => DataType::Interval,
7990
Schema::Bytes => DataType::Bytea,
8091
Schema::Enum { .. } => DataType::Varchar,
81-
Schema::Record { fields, .. } => {
92+
Schema::Record { fields, name, .. } => {
93+
if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
94+
&& name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
95+
{
96+
return Ok(DataType::Decimal);
97+
}
98+
8299
let struct_fields = fields
83100
.iter()
84101
.map(|f| avro_type_mapping(&f.schema))
@@ -140,20 +157,24 @@ pub(crate) fn get_field_from_avro_value<'a>(
140157

141158
pub(crate) fn avro_decimal_to_rust_decimal(
142159
avro_decimal: AvroDecimal,
143-
precision: usize,
160+
_precision: usize,
144161
scale: usize,
145162
) -> Result<rust_decimal::Decimal> {
146-
if precision > RW_DECIMAL_MAX_PRECISION {
147-
return Err(RwError::from(ProtocolError(format!(
148-
"only support decimal with max precision {} but given avro decimal with precision {}",
149-
RW_DECIMAL_MAX_PRECISION, precision
150-
))));
151-
}
152-
153163
let negative = !avro_decimal.is_positive();
154164
let bytes = avro_decimal.to_vec_unsigned();
155165

156-
let (lo, mid, hi) = match bytes.len() {
166+
let (lo, mid, hi) = extract_decimal(bytes);
167+
Ok(rust_decimal::Decimal::from_parts(
168+
lo,
169+
mid,
170+
hi,
171+
negative,
172+
scale as u32,
173+
))
174+
}
175+
176+
pub(crate) fn extract_decimal(bytes: Vec<u8>) -> (u32, u32, u32) {
177+
match bytes.len() {
157178
len @ 0..=4 => {
158179
let mut pad = vec![0; 4 - len];
159180
pad.extend_from_slice(&bytes);
@@ -176,14 +197,7 @@ pub(crate) fn avro_decimal_to_rust_decimal(
176197
(lo, mid, hi)
177198
}
178199
_ => unreachable!(),
179-
};
180-
Ok(rust_decimal::Decimal::from_parts(
181-
lo,
182-
mid,
183-
hi,
184-
negative,
185-
scale as u32,
186-
))
200+
}
187201
}
188202

189203
pub(crate) fn unix_epoch_days() -> i32 {

src/connector/src/parser/unified/avro.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@
1414

1515
use std::str::FromStr;
1616

17+
use anyhow::anyhow;
1718
use apache_avro::types::Value;
1819
use apache_avro::Schema;
1920
use itertools::Itertools;
21+
use num_bigint::{BigInt, Sign};
2022
use risingwave_common::array::{ListValue, StructValue};
2123
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
2224
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl};
2325
use risingwave_common::util::iter_util::ZipEqFast;
2426

2527
use super::{Access, AccessError, AccessResult};
2628
use crate::parser::avro::util::{
27-
avro_decimal_to_rust_decimal, extract_inner_field_schema, unix_epoch_days,
29+
avro_decimal_to_rust_decimal, extract_decimal, extract_inner_field_schema, unix_epoch_days,
2830
};
2931
#[derive(Clone)]
3032
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
@@ -118,6 +120,40 @@ impl<'a> AvroParseOptions<'a> {
118120
.map_err(|_| create_error())?;
119121
ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
120122
}
123+
(Some(DataType::Decimal), Value::Record(fields)) => {
124+
// VariableScaleDecimal has fixed fields, scale(int) and value(bytes)
125+
let find_in_records = |field_name: &str| {
126+
fields
127+
.iter()
128+
.find(|field| field.0 == field_name)
129+
.map(|field| &field.1)
130+
};
131+
let scale = match find_in_records("scale").ok_or(AccessError::Other(anyhow!(
132+
"scale field not found in VariableScaleDecimal"
133+
)))? {
134+
Value::Int(scale) => Ok(*scale),
135+
avro_value => Err(AccessError::Other(anyhow!(
136+
"scale field in VariableScaleDecimal is not int, got {:?}",
137+
avro_value
138+
))),
139+
}?;
140+
141+
let value: BigInt = match find_in_records("value").ok_or(AccessError::Other(
142+
anyhow!("value field not found in VariableScaleDecimal"),
143+
))? {
144+
Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)),
145+
avro_value => Err(AccessError::Other(anyhow!(
146+
"value field in VariableScaleDecimal is not bytes, got {:?}",
147+
avro_value
148+
))),
149+
}?;
150+
151+
let negative = value.sign() == Sign::Minus;
152+
let (lo, mid, hi) = extract_decimal(value.to_bytes_be().1);
153+
let decimal =
154+
rust_decimal::Decimal::from_parts(lo, mid, hi, negative, scale as u32);
155+
ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
156+
}
121157

122158
// ---- Date -----
123159
(Some(DataType::Date) | None, Value::Date(days)) => {

0 commit comments

Comments
 (0)