Skip to content

Commit dd218e3

Browse files
committed
fix coroutine
Signed-off-by: xxchan <[email protected]>
1 parent c0bafd9 commit dd218e3

File tree

3 files changed

+121
-112
lines changed

3 files changed

+121
-112
lines changed

src/connector/src/sink/formatter/append_only.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,22 @@ impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<KE, V
4040
&self,
4141
chunk: &StreamChunk,
4242
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
43-
std::iter::from_coroutine(|| {
44-
for (op, row) in chunk.rows() {
45-
if op != Op::Insert {
46-
continue;
47-
}
48-
let event_key_object = match &self.key_encoder {
49-
Some(key_encoder) => Some(tri!(key_encoder.encode(row))),
50-
None => None,
51-
};
52-
let event_object = Some(tri!(self.val_encoder.encode(row)));
43+
std::iter::from_coroutine(
44+
#[coroutine]
45+
|| {
46+
for (op, row) in chunk.rows() {
47+
if op != Op::Insert {
48+
continue;
49+
}
50+
let event_key_object = match &self.key_encoder {
51+
Some(key_encoder) => Some(tri!(key_encoder.encode(row))),
52+
None => None,
53+
};
54+
let event_object = Some(tri!(self.val_encoder.encode(row)));
5355

54-
yield Ok((event_key_object, event_object))
55-
}
56-
})
56+
yield Ok((event_key_object, event_object))
57+
}
58+
},
59+
)
5760
}
5861
}

src/connector/src/sink/formatter/debezium_json.rs

Lines changed: 85 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -98,100 +98,103 @@ impl SinkFormatter for DebeziumJsonFormatter {
9898
&self,
9999
chunk: &StreamChunk,
100100
) -> impl Iterator<Item = Result<(Option<Value>, Option<Value>)>> {
101-
std::iter::from_coroutine(|| {
102-
let DebeziumJsonFormatter {
103-
schema,
104-
pk_indices,
105-
db_name,
106-
sink_from_name,
107-
opts,
108-
key_encoder,
109-
val_encoder,
110-
} = self;
111-
let ts_ms = SystemTime::now()
112-
.duration_since(UNIX_EPOCH)
113-
.unwrap()
114-
.as_millis() as u64;
115-
let source_field = json!({
116-
// todo: still some missing fields in source field
117-
// ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
118-
"db": db_name,
119-
"table": sink_from_name,
120-
"ts_ms": ts_ms,
121-
});
122-
123-
let mut update_cache: Option<Map<String, Value>> = None;
124-
125-
for (op, row) in chunk.rows() {
126-
let event_key_object: Option<Value> = Some(json!({
127-
"schema": json!({
128-
"type": "struct",
129-
"fields": fields_pk_to_json(&schema.fields, pk_indices),
130-
"optional": false,
131-
"name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
132-
}),
133-
"payload": tri!(key_encoder.encode(row)),
134-
}));
135-
let event_object: Option<Value> = match op {
136-
Op::Insert => Some(json!({
137-
"schema": schema_to_json(schema, db_name, sink_from_name),
138-
"payload": {
139-
"before": null,
140-
"after": tri!(val_encoder.encode(row)),
141-
"op": "c",
142-
"ts_ms": ts_ms,
143-
"source": source_field,
144-
}
145-
})),
146-
Op::Delete => {
147-
let value_obj = Some(json!({
101+
std::iter::from_coroutine(
102+
#[coroutine]
103+
|| {
104+
let DebeziumJsonFormatter {
105+
schema,
106+
pk_indices,
107+
db_name,
108+
sink_from_name,
109+
opts,
110+
key_encoder,
111+
val_encoder,
112+
} = self;
113+
let ts_ms = SystemTime::now()
114+
.duration_since(UNIX_EPOCH)
115+
.unwrap()
116+
.as_millis() as u64;
117+
let source_field = json!({
118+
// todo: still some missing fields in source field
119+
// ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
120+
"db": db_name,
121+
"table": sink_from_name,
122+
"ts_ms": ts_ms,
123+
});
124+
125+
let mut update_cache: Option<Map<String, Value>> = None;
126+
127+
for (op, row) in chunk.rows() {
128+
let event_key_object: Option<Value> = Some(json!({
129+
"schema": json!({
130+
"type": "struct",
131+
"fields": fields_pk_to_json(&schema.fields, pk_indices),
132+
"optional": false,
133+
"name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
134+
}),
135+
"payload": tri!(key_encoder.encode(row)),
136+
}));
137+
let event_object: Option<Value> = match op {
138+
Op::Insert => Some(json!({
148139
"schema": schema_to_json(schema, db_name, sink_from_name),
149140
"payload": {
150-
"before": tri!(val_encoder.encode(row)),
151-
"after": null,
152-
"op": "d",
141+
"before": null,
142+
"after": tri!(val_encoder.encode(row)),
143+
"op": "c",
153144
"ts_ms": ts_ms,
154145
"source": source_field,
155146
}
156-
}));
157-
yield Ok((event_key_object.clone(), value_obj));
158-
159-
if opts.gen_tombstone {
160-
// Tomestone event
161-
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
162-
yield Ok((event_key_object, None));
163-
}
164-
165-
continue;
166-
}
167-
Op::UpdateDelete => {
168-
update_cache = Some(tri!(val_encoder.encode(row)));
169-
continue;
170-
}
171-
Op::UpdateInsert => {
172-
if let Some(before) = update_cache.take() {
173-
Some(json!({
147+
})),
148+
Op::Delete => {
149+
let value_obj = Some(json!({
174150
"schema": schema_to_json(schema, db_name, sink_from_name),
175151
"payload": {
176-
"before": before,
177-
"after": tri!(val_encoder.encode(row)),
178-
"op": "u",
152+
"before": tri!(val_encoder.encode(row)),
153+
"after": null,
154+
"op": "d",
179155
"ts_ms": ts_ms,
180156
"source": source_field,
181157
}
182-
}))
183-
} else {
184-
warn!(
185-
"not found UpdateDelete in prev row, skipping, row index {:?}",
186-
row.index()
187-
);
158+
}));
159+
yield Ok((event_key_object.clone(), value_obj));
160+
161+
if opts.gen_tombstone {
162+
// Tomestone event
163+
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
164+
yield Ok((event_key_object, None));
165+
}
166+
188167
continue;
189168
}
190-
}
191-
};
192-
yield Ok((event_key_object, event_object));
193-
}
194-
})
169+
Op::UpdateDelete => {
170+
update_cache = Some(tri!(val_encoder.encode(row)));
171+
continue;
172+
}
173+
Op::UpdateInsert => {
174+
if let Some(before) = update_cache.take() {
175+
Some(json!({
176+
"schema": schema_to_json(schema, db_name, sink_from_name),
177+
"payload": {
178+
"before": before,
179+
"after": tri!(val_encoder.encode(row)),
180+
"op": "u",
181+
"ts_ms": ts_ms,
182+
"source": source_field,
183+
}
184+
}))
185+
} else {
186+
warn!(
187+
"not found UpdateDelete in prev row, skipping, row index {:?}",
188+
row.index()
189+
);
190+
continue;
191+
}
192+
}
193+
};
194+
yield Ok((event_key_object, event_object));
195+
}
196+
},
197+
)
195198
}
196199
}
197200

src/connector/src/sink/formatter/upsert.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,25 @@ impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<KE, VE> {
4040
&self,
4141
chunk: &StreamChunk,
4242
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
43-
std::iter::from_coroutine(|| {
44-
for (op, row) in chunk.rows() {
45-
let event_key_object = Some(tri!(self.key_encoder.encode(row)));
46-
47-
let event_object = match op {
48-
Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))),
49-
// Empty value with a key
50-
Op::Delete => None,
51-
Op::UpdateDelete => {
52-
// upsert semantic does not require update delete event
53-
continue;
54-
}
55-
};
56-
57-
yield Ok((event_key_object, event_object))
58-
}
59-
})
43+
std::iter::from_coroutine(
44+
#[coroutine]
45+
|| {
46+
for (op, row) in chunk.rows() {
47+
let event_key_object = Some(tri!(self.key_encoder.encode(row)));
48+
49+
let event_object = match op {
50+
Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))),
51+
// Empty value with a key
52+
Op::Delete => None,
53+
Op::UpdateDelete => {
54+
// upsert semantic does not require update delete event
55+
continue;
56+
}
57+
};
58+
59+
yield Ok((event_key_object, event_object))
60+
}
61+
},
62+
)
6063
}
6164
}

0 commit comments

Comments
 (0)