Skip to content

Commit 7cd7c9d

Browse files
authored
fix(conector-node): do not store sink row inside upsert iceberg sink (risingwavelabs#8625)
1 parent 6fd8821 commit 7cd7c9d

File tree

4 files changed

+91
-42
lines changed

4 files changed

+91
-42
lines changed

java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.grpc.Status;
2020
import java.util.List;
2121
import java.util.TreeMap;
22+
import org.apache.iceberg.data.Record;
2223

2324
public class SinkRowMap {
2425
TreeMap<List<Comparable<Object>>, SinkRowOp> map = new TreeMap<>(new PkComparator());
@@ -27,7 +28,7 @@ public void clear() {
2728
map.clear();
2829
}
2930

30-
public void insert(List<Comparable<Object>> key, SinkRow row) {
31+
public void insert(List<Comparable<Object>> key, Record row) {
3132
if (!map.containsKey(key)) {
3233
map.put(key, SinkRowOp.insertOp(row));
3334
} else {
@@ -42,19 +43,20 @@ public void insert(List<Comparable<Object>> key, SinkRow row) {
4243
}
4344
}
4445

45-
public void delete(List<Comparable<Object>> key, SinkRow row) {
46+
public void delete(List<Comparable<Object>> key, Record row) {
4647
if (!map.containsKey(key)) {
4748
map.put(key, SinkRowOp.deleteOp(row));
4849
} else {
4950
SinkRowOp sinkRowOp = map.get(key);
50-
SinkRow insert = sinkRowOp.getInsert();
51+
Record insert = sinkRowOp.getInsert();
5152
if (insert == null) {
5253
throw Status.FAILED_PRECONDITION
5354
.withDescription("try to double delete a primary key")
5455
.asRuntimeException();
5556
}
56-
assertRowValuesEqual(insert, row);
57-
SinkRow delete = sinkRowOp.getDelete();
57+
// TODO: may enable it again
58+
// assertRowValuesEqual(insert, row);
59+
Record delete = sinkRowOp.getDelete();
5860
if (delete != null) {
5961
map.put(key, SinkRowOp.deleteOp(delete));
6062
} else {

java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414

1515
package com.risingwave.connector;
1616

17-
import com.risingwave.connector.api.sink.SinkRow;
1817
import io.grpc.Status;
18+
import org.apache.iceberg.data.Record;
1919

2020
public class SinkRowOp {
21-
private final SinkRow delete;
22-
private final SinkRow insert;
21+
private final Record delete;
22+
private final Record insert;
2323

24-
public static SinkRowOp insertOp(SinkRow row) {
24+
public static SinkRowOp insertOp(Record row) {
2525
if (row == null) {
2626
throw Status.FAILED_PRECONDITION
2727
.withDescription("row op must not be null to initialize insertOp")
@@ -30,7 +30,7 @@ public static SinkRowOp insertOp(SinkRow row) {
3030
return new SinkRowOp(null, row);
3131
}
3232

33-
public static SinkRowOp deleteOp(SinkRow row) {
33+
public static SinkRowOp deleteOp(Record row) {
3434
if (row == null) {
3535
throw Status.FAILED_PRECONDITION
3636
.withDescription("row op must not be null to initialize deleteOp")
@@ -39,7 +39,7 @@ public static SinkRowOp deleteOp(SinkRow row) {
3939
return new SinkRowOp(row, null);
4040
}
4141

42-
public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
42+
public static SinkRowOp updateOp(Record delete, Record insert) {
4343
if (delete == null || insert == null) {
4444
throw Status.FAILED_PRECONDITION
4545
.withDescription("row ops must not be null initialize updateOp")
@@ -48,7 +48,7 @@ public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
4848
return new SinkRowOp(delete, insert);
4949
}
5050

51-
private SinkRowOp(SinkRow delete, SinkRow insert) {
51+
private SinkRowOp(Record delete, Record insert) {
5252
this.delete = delete;
5353
this.insert = insert;
5454
}
@@ -57,11 +57,11 @@ public boolean isDelete() {
5757
return insert == null && delete != null;
5858
}
5959

60-
public SinkRow getDelete() {
60+
public Record getDelete() {
6161
return delete;
6262
}
6363

64-
public SinkRow getInsert() {
64+
public Record getInsert() {
6565
return insert;
6666
}
6767
}

java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public UpsertIcebergSink(
7575
.collect(Collectors.toList());
7676
}
7777

78-
private Record newRecord(Schema schema, SinkRow row) {
78+
private static Record newRecord(Schema schema, SinkRow row) {
7979
Record record = GenericRecord.create(schema);
8080
for (int i = 0; i < schema.columns().size(); i++) {
8181
record.set(i, row.get(i));
@@ -174,10 +174,10 @@ public void write(Iterator<SinkRow> rows) {
174174
}
175175
switch (row.getOp()) {
176176
case INSERT:
177-
sinkRowMap.insert(getKeyFromRow(row), row);
177+
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
178178
break;
179179
case DELETE:
180-
sinkRowMap.delete(getKeyFromRow(row), row);
180+
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
181181
break;
182182
case UPDATE_DELETE:
183183
if (updateBufferExists) {
@@ -186,7 +186,7 @@ public void write(Iterator<SinkRow> rows) {
186186
"an UPDATE_INSERT should precede an UPDATE_DELETE")
187187
.asRuntimeException();
188188
}
189-
sinkRowMap.delete(getKeyFromRow(row), row);
189+
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
190190
updateBufferExists = true;
191191
break;
192192
case UPDATE_INSERT:
@@ -196,7 +196,7 @@ public void write(Iterator<SinkRow> rows) {
196196
"an UPDATE_INSERT should precede an UPDATE_DELETE")
197197
.asRuntimeException();
198198
}
199-
sinkRowMap.insert(getKeyFromRow(row), row);
199+
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
200200
updateBufferExists = false;
201201
break;
202202
default:
@@ -217,13 +217,13 @@ public void sync() {
217217
newEqualityDeleteWriter(entry.getKey());
218218
DataWriter<Record> dataWriter = newDataWriter(entry.getKey());
219219
for (SinkRowOp sinkRowOp : entry.getValue().map.values()) {
220-
SinkRow insert = sinkRowOp.getInsert();
221-
SinkRow delete = sinkRowOp.getDelete();
220+
Record insert = sinkRowOp.getInsert();
221+
Record delete = sinkRowOp.getDelete();
222222
if (insert != null) {
223-
dataWriter.write(newRecord(rowSchema, insert));
223+
dataWriter.write(insert);
224224
}
225225
if (delete != null) {
226-
equalityDeleteWriter.write(newRecord(deleteRowSchema, delete));
226+
equalityDeleteWriter.write(delete);
227227
}
228228
}
229229
try {

java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.risingwave.proto.Data;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import org.apache.iceberg.Schema;
25+
import org.apache.iceberg.data.GenericRecord;
26+
import org.apache.iceberg.data.Record;
27+
import org.apache.iceberg.types.Types;
2428
import org.junit.Assert;
2529
import org.junit.Test;
2630

@@ -31,29 +35,42 @@ public void testInsert() {
3135
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
3236
List<Comparable<Object>> key = new ArrayList<>();
3337
key.add((Comparable<Object>) row.get(0));
38+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
39+
Record r = GenericRecord.create(schema);
40+
r.set(0, row.get(0));
3441

35-
sinkRowMap.insert(key, row);
42+
sinkRowMap.insert(key, r);
3643
assertEquals(1, sinkRowMap.map.size());
3744
assertEquals(null, sinkRowMap.map.get(key).getDelete());
38-
assertEquals(row, sinkRowMap.map.get(key).getInsert());
45+
assertEquals(r, sinkRowMap.map.get(key).getInsert());
3946
}
4047

4148
@Test
4249
public void testInsertAfterDelete() {
4350
SinkRowMap sinkRowMap = new SinkRowMap();
51+
Schema schema =
52+
new Schema(
53+
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
54+
Types.NestedField.optional(1, "name", Types.StringType.get()));
4455

4556
SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
4657
List<Comparable<Object>> key1 = new ArrayList<>();
4758
key1.add((Comparable<Object>) row1.get(0));
59+
Record r1 = GenericRecord.create(schema);
60+
r1.set(0, row1.get(0));
61+
r1.set(1, row1.get(1));
4862
SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob");
4963
List<Comparable<Object>> key2 = new ArrayList<>();
5064
key2.add((Comparable<Object>) row2.get(0));
65+
Record r2 = GenericRecord.create(schema);
66+
r2.set(0, row2.get(0));
67+
r2.set(1, row2.get(1));
5168

52-
sinkRowMap.delete(key1, row1);
53-
sinkRowMap.insert(key1, row2);
69+
sinkRowMap.delete(key1, r1);
70+
sinkRowMap.insert(key1, r2);
5471
assertEquals(1, sinkRowMap.map.size());
55-
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
56-
assertEquals(row2, sinkRowMap.map.get(key1).getInsert());
72+
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
73+
assertEquals(r2, sinkRowMap.map.get(key1).getInsert());
5774
}
5875

5976
@Test
@@ -62,11 +79,14 @@ public void testInsertAfterInsert() {
6279
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
6380
List<Comparable<Object>> key = new ArrayList<>();
6481
key.add((Comparable<Object>) row.get(0));
82+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
83+
Record r = GenericRecord.create(schema);
84+
r.set(0, row.get(0));
6585

66-
sinkRowMap.insert(key, row);
86+
sinkRowMap.insert(key, r);
6787
boolean exceptionThrown = false;
6888
try {
69-
sinkRowMap.insert(key, row);
89+
sinkRowMap.insert(key, r);
7090
} catch (RuntimeException e) {
7191
exceptionThrown = true;
7292
Assert.assertTrue(
@@ -87,10 +107,14 @@ public void testDelete() {
87107
List<Comparable<Object>> key = new ArrayList<>();
88108
key.add((Comparable<Object>) row.get(0));
89109

90-
sinkRowMap.delete(key, row);
110+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
111+
Record r = GenericRecord.create(schema);
112+
r.set(0, row.get(0));
113+
114+
sinkRowMap.delete(key, r);
91115
assertEquals(1, sinkRowMap.map.size());
92116
assertEquals(null, sinkRowMap.map.get(key).getInsert());
93-
assertEquals(row, sinkRowMap.map.get(key).getDelete());
117+
assertEquals(r, sinkRowMap.map.get(key).getDelete());
94118
}
95119

96120
@Test
@@ -100,10 +124,14 @@ public void testDeleteAfterDelete() {
100124
List<Comparable<Object>> key = new ArrayList<>();
101125
key.add((Comparable<Object>) row.get(0));
102126

103-
sinkRowMap.delete(key, row);
127+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
128+
Record r = GenericRecord.create(schema);
129+
r.set(0, row.get(0));
130+
131+
sinkRowMap.delete(key, r);
104132
boolean exceptionThrown = false;
105133
try {
106-
sinkRowMap.delete(key, row);
134+
sinkRowMap.delete(key, r);
107135
} catch (RuntimeException e) {
108136
exceptionThrown = true;
109137
Assert.assertTrue(
@@ -122,28 +150,44 @@ public void testDeleteAfterInsert() {
122150
List<Comparable<Object>> key = new ArrayList<>();
123151
key.add((Comparable<Object>) row.get(0));
124152

125-
sinkRowMap.insert(key, row);
126-
sinkRowMap.delete(key, row);
153+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
154+
Record r = GenericRecord.create(schema);
155+
r.set(0, row.get(0));
156+
157+
sinkRowMap.insert(key, r);
158+
sinkRowMap.delete(key, r);
127159
assertEquals(0, sinkRowMap.map.size());
128160
}
129161

130162
@Test
131163
public void testDeleteAfterUpdate() {
132164
SinkRowMap sinkRowMap = new SinkRowMap();
133165

166+
Schema schema =
167+
new Schema(
168+
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
169+
Types.NestedField.optional(1, "name", Types.StringType.get()));
170+
134171
SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
135172
List<Comparable<Object>> key1 = new ArrayList<>();
136173
key1.add((Comparable<Object>) row1.get(0));
174+
Record r1 = GenericRecord.create(schema);
175+
r1.set(0, row1.get(0));
176+
r1.set(1, row1.get(1));
177+
137178
SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare");
138179
List<Comparable<Object>> key2 = new ArrayList<>();
139180
key2.add((Comparable<Object>) row2.get(0));
181+
Record r2 = GenericRecord.create(schema);
182+
r2.set(0, row2.get(0));
183+
r2.set(1, row2.get(1));
140184

141-
sinkRowMap.delete(key1, row1);
142-
sinkRowMap.insert(key2, row2);
143-
sinkRowMap.delete(key2, row2);
185+
sinkRowMap.delete(key1, r1);
186+
sinkRowMap.insert(key2, r2);
187+
sinkRowMap.delete(key2, r2);
144188
assertEquals(1, sinkRowMap.map.size());
145189
assertEquals(null, sinkRowMap.map.get(key1).getInsert());
146-
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
190+
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
147191
}
148192

149193
@Test
@@ -153,7 +197,10 @@ public void testClear() {
153197
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
154198
List<Comparable<Object>> key = new ArrayList<>();
155199
key.add((Comparable<Object>) row.get(0));
156-
sinkRowMap.insert(key, row);
200+
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
201+
Record r = GenericRecord.create(schema);
202+
r.set(0, row.get(0));
203+
sinkRowMap.insert(key, r);
157204

158205
sinkRowMap.clear();
159206
assertEquals(0, sinkRowMap.map.size());

0 commit comments

Comments
 (0)