Skip to content

Commit 2f43344

Browse files
sfmontyohpgrahsl
authored andcommitted
add support for maps with list values (#86)
* add support for maps with list values * add map of strings and ints to unit test * add example of a map with non-primitive type values * use same fields in all 3 types of docs
1 parent 66c3405 commit 2f43344

File tree

3 files changed

+93
-27
lines changed

3 files changed

+93
-27
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ The conversion is able to generically deal with nested key or value structures -
4848
]}
4949
}},
5050
{"name": "lut", "type": {"type": "map", "values": "double"}},
51+
{"name": "flags",
52+
"type": [ "null",
53+
{"type": "map", "values": {"type": "array", "items": "string"} }
54+
],
55+
"default": null },
5156
{"name": "raw", "type": "bytes"}
5257
]
5358
}

src/main/java/at/grahsl/kafka/connect/mongodb/converter/AvroJsonSchemafulRecordConverter.java

+34-21
Original file line numberDiff line numberDiff line change
@@ -113,55 +113,68 @@ private void processField(BsonDocument doc, Struct struct, Field field) {
113113
case INT64:
114114
case STRING:
115115
case BYTES:
116-
handlePrimitiveField(doc, struct, field);
116+
handlePrimitiveField(doc, struct.get(field), field);
117117
break;
118118
case STRUCT:
119-
handleStructField(doc, struct, field);
119+
handleStructField(doc, (Struct)struct.get(field), field);
120120
break;
121121
case ARRAY:
122-
handleArrayField(doc, struct, field);
122+
handleArrayField(doc, (List)struct.get(field), field);
123123
break;
124124
case MAP:
125-
handleMapField(doc, struct, field);
125+
handleMapField(doc, (Map)struct.get(field), field);
126126
break;
127127
default:
128+
logger.error("Invalid schema. unexpected / unsupported schema type '"
129+
+ field.schema().type() + "' for field '"
130+
+ field.name() + "' value='" + struct + "'");
128131
throw new DataException("unexpected / unsupported schema type " + field.schema().type());
129132
}
130133
} catch (Exception exc) {
134+
logger.error("Error while processing field. schema type '"
135+
+ field.schema().type() + "' for field '"
136+
+ field.name() + "' value='" + struct + "'");
131137
throw new DataException("error while processing field " + field.name(), exc);
132138
}
133139

134140
}
135141

136-
private void handleMapField(BsonDocument doc, Struct struct, Field field) {
142+
private void handleMapField(BsonDocument doc, Map m, Field field) {
137143
logger.trace("handling complex type 'map'");
138-
BsonDocument bd = new BsonDocument();
139-
if(struct.get(field)==null) {
144+
if(m==null) {
140145
logger.trace("no field in struct -> adding null");
141146
doc.put(field.name(), BsonNull.VALUE);
142147
return;
143148
}
144-
Map m = (Map)struct.get(field);
149+
BsonDocument bd = new BsonDocument();
145150
for(Object entry : m.keySet()) {
146151
String key = (String)entry;
147-
if(field.schema().valueSchema().type().isPrimitive()) {
152+
Schema.Type valueSchemaType = field.schema().valueSchema().type();
153+
if(valueSchemaType.isPrimitive()) {
148154
bd.put(key, getConverter(field.schema().valueSchema()).toBson(m.get(key),field.schema()));
155+
} else if (valueSchemaType.equals(Schema.Type.ARRAY)) {
156+
final Field elementField = new Field(key, 0, field.schema().valueSchema());
157+
final List list = (List)m.get(key);
158+
logger.trace("adding array values to {} of type valueSchema={} value='{}'",
159+
elementField.name(), elementField.schema().valueSchema(), list);
160+
handleArrayField(bd, list, elementField);
149161
} else {
150162
bd.put(key, toBsonDoc(field.schema().valueSchema(), m.get(key)));
151163
}
152164
}
153165
doc.put(field.name(), bd);
154166
}
155167

156-
private void handleArrayField(BsonDocument doc, Struct struct, Field field) {
157-
logger.trace("handling complex type 'array'");
158-
BsonArray array = new BsonArray();
159-
if(struct.get(field)==null) {
160-
logger.trace("no field in struct -> adding null");
168+
private void handleArrayField(BsonDocument doc, List list, Field field) {
169+
logger.trace("handling complex type 'array' of types '{}'",
170+
field.schema().valueSchema().type());
171+
if(list==null) {
172+
logger.trace("no array -> adding null");
161173
doc.put(field.name(), BsonNull.VALUE);
162174
return;
163175
}
164-
for(Object element : (List)struct.get(field)) {
176+
BsonArray array = new BsonArray();
177+
for(Object element : list) {
165178
if(field.schema().valueSchema().type().isPrimitive()) {
166179
array.add(getConverter(field.schema().valueSchema()).toBson(element,field.schema()));
167180
} else {
@@ -173,18 +186,18 @@ private void handleArrayField(BsonDocument doc, Struct struct, Field field) {
173186

174187
private void handleStructField(BsonDocument doc, Struct struct, Field field) {
175188
logger.trace("handling complex type 'struct'");
176-
if(struct.get(field)!=null) {
177-
logger.trace(struct.get(field).toString());
178-
doc.put(field.name(), toBsonDoc(field.schema(), struct.get(field)));
189+
if(struct!=null) {
190+
logger.trace(struct.toString());
191+
doc.put(field.name(), toBsonDoc(field.schema(), struct));
179192
} else {
180193
logger.trace("no field in struct -> adding null");
181194
doc.put(field.name(), BsonNull.VALUE);
182195
}
183196
}
184197

185-
private void handlePrimitiveField(BsonDocument doc, Struct struct, Field field) {
186-
logger.trace("handling primitive type '{}'",field.schema().type());
187-
doc.put(field.name(), getConverter(field.schema()).toBson(struct.get(field),field.schema()));
198+
private void handlePrimitiveField(BsonDocument doc, Object value, Field field) {
199+
logger.trace("handling primitive type '{}' name='{}'",field.schema().type(),field.name());
200+
doc.put(field.name(), getConverter(field.schema()).toBson(value,field.schema()));
188201
}
189202

190203
private boolean isSupportedLogicalType(Schema schema) {

src/test/java/at/grahsl/kafka/connect/mongodb/converter/RecordConverterTest.java

+54-6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Arrays;
3535
import java.util.HashMap;
3636
import java.util.LinkedHashMap;
37+
import java.util.List;
3738
import java.util.Map;
3839

3940
import static org.junit.jupiter.api.Assertions.*;
@@ -45,7 +46,8 @@ public class RecordConverterTest {
4546
public static Schema OBJ_SCHEMA_1;
4647
public static Struct OBJ_STRUCT_1;
4748
public static Map OBJ_MAP_1;
48-
public static BsonDocument EXPECTED_BSON_DOC_BYTES_1;
49+
public static BsonDocument EXPECTED_BSON_DOC_OBJ_STRUCT_1;
50+
public static BsonDocument EXPECTED_BSON_DOC_OBJ_MAP_1;
4951
public static BsonDocument EXPECTED_BSON_DOC_RAW_1;
5052

5153
@BeforeAll
@@ -60,6 +62,8 @@ public static void initializeTestData() {
6062
"\"myArray1\":[\"str_1\",\"str_2\",\"...\",\"str_N\"]," +
6163
"\"myArray2\":[{\"k\":\"a\",\"v\":1},{\"k\":\"b\",\"v\":2},{\"k\":\"c\",\"v\":3}]," +
6264
"\"mySubDoc2\":{\"k1\":9,\"k2\":8,\"k3\":7}," +
65+
"\"myMapOfStrings\":{\"k1\": [ \"v1-a\", \"v1-b\" ],\"k2\": [ \"v2-a\" ],\"k3\":[ \"v3-a\", \"v3-b\", \"v3-c\" ]}," +
66+
"\"myMapOfInts\":{\"k1\": [ 11, 12 ],\"k2\": [ 21 ],\"k3\":[ 31, 32, 33 ]}," +
6367
"\"myBytes\":\"S2Fma2Egcm9ja3Mh\"," +
6468
"\"myDate\": 1489705200000," +
6569
"\"myTimestamp\": 1489705200000," +
@@ -82,6 +86,8 @@ public static void initializeTestData() {
8286
.build())
8387
)
8488
.field("mySubDoc2", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
89+
.field( "myMapOfStrings", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.STRING_SCHEMA).build()).build())
90+
.field( "myMapOfInts", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.INT32_SCHEMA).build()).build())
8591
.field("myBytes", Schema.BYTES_SCHEMA)
8692
.field("myDate", Date.SCHEMA)
8793
.field("myTimestamp", Timestamp.SCHEMA)
@@ -108,6 +114,16 @@ public static void initializeTestData() {
108114
)
109115
)
110116
.put("mySubDoc2",new HashMap<String,Integer>(){{ put("k1",9); put("k2",8); put("k3",7);}})
117+
.put("myMapOfStrings", new HashMap<String, List<String>>(){{
118+
put("k1", Arrays.asList("v1-a", "v1-b"));
119+
put("k2", Arrays.asList("v2-a"));
120+
put("k3", Arrays.asList("v3-a", "v3-b", "v3-c"));
121+
}})
122+
.put("myMapOfInts", new HashMap<String, List<Integer>>(){{
123+
put("k1", Arrays.asList(11, 12));
124+
put("k2", Arrays.asList(21));
125+
put("k3", Arrays.asList(31, 32, 33));
126+
}})
111127
.put("myBytes", new byte[]{75, 97, 102, 107, 97, 32, 114, 111, 99, 107, 115, 33})
112128
.put("myDate", java.util.Date.from(ZonedDateTime.of(
113129
LocalDate.of(2017,3,17), LocalTime.MIDNIGHT, ZoneOffset.systemDefault()
@@ -137,6 +153,14 @@ public static void initializeTestData() {
137153
)
138154
);
139155
OBJ_MAP_1.put("mySubDoc2",new HashMap<String,Integer>(){{ put("k1",9); put("k2",8); put("k3",7);}});
156+
OBJ_MAP_1.put("myMapOfStrings",new HashMap<String,List<String>>(){{
157+
put("k1",Arrays.asList("v1-a", "v1-b"));
158+
put("k2",Arrays.asList("v2-a"));
159+
put("k3",Arrays.asList("v3-a", "v3-b", "v3-c"));}});
160+
OBJ_MAP_1.put("myMapOfInts",new HashMap<String,List<Integer>>(){{
161+
put("k1",Arrays.asList(11, 12));
162+
put("k2",Arrays.asList(21));
163+
put("k3",Arrays.asList(31, 32, 33));}});
140164
OBJ_MAP_1.put("myBytes", new byte[]{75, 97, 102, 107, 97, 32, 114, 111, 99, 107, 115, 33});
141165
OBJ_MAP_1.put("myDate", java.util.Date.from(ZonedDateTime.of(
142166
LocalDate.of(2017,3,17), LocalTime.MIDNIGHT, ZoneOffset.systemDefault()
@@ -154,7 +178,7 @@ public static void initializeTestData() {
154178
// thus I'm cheating a little by using a Decimal128 here...
155179
OBJ_MAP_1.put("myDecimal", Decimal128.parse("12345.6789"));
156180

157-
EXPECTED_BSON_DOC_BYTES_1 = new BsonDocument()
181+
BsonDocument commonMapAndStructFields = new BsonDocument()
158182
.append("_id", new BsonString("1234567890"))
159183
.append("myString", new BsonString("some foo bla text"))
160184
.append("myInt", new BsonInt32(42))
@@ -170,10 +194,31 @@ public static void initializeTestData() {
170194
new BsonDocument("k", new BsonString("a")).append("v", new BsonInt32(1)),
171195
new BsonDocument("k", new BsonString("b")).append("v", new BsonInt32(2)),
172196
new BsonDocument("k", new BsonString("c")).append("v", new BsonInt32(3))))
173-
).append("mySubDoc2", new BsonDocument("k1", new BsonInt32(9))
197+
)
198+
.append("mySubDoc2", new BsonDocument("k1", new BsonInt32(9))
174199
.append("k2", new BsonInt32(8))
175200
.append("k3", new BsonInt32(7))
176201
)
202+
.append("myMapOfStrings", new BsonDocument("k1", new BsonInt32(9))
203+
.append("k1", new BsonArray(Arrays.asList(
204+
new BsonString("v1-a"),
205+
new BsonString("v1-b"))))
206+
.append("k2", new BsonArray(Arrays.asList(
207+
new BsonString("v2-a"))))
208+
.append("k3", new BsonArray(Arrays.asList(
209+
new BsonString("v3-a"),
210+
new BsonString("v3-b"),
211+
new BsonString("v3-c")))))
212+
.append("myMapOfInts", new BsonDocument("k1", new BsonInt32(9))
213+
.append("k1", new BsonArray(Arrays.asList(
214+
new BsonInt32(11),
215+
new BsonInt32(12))))
216+
.append("k2", new BsonArray(Arrays.asList(
217+
new BsonInt32(21))))
218+
.append("k3", new BsonArray(Arrays.asList(
219+
new BsonInt32(31),
220+
new BsonInt32(32),
221+
new BsonInt32(33)))))
177222
.append("myBytes", new BsonBinary(new byte[]{75, 97, 102, 107, 97, 32, 114, 111, 99, 107, 115, 33}))
178223
.append("myDate", new BsonDateTime(
179224
java.util.Date.from(ZonedDateTime.of(
@@ -192,7 +237,10 @@ public static void initializeTestData() {
192237
))
193238
.append("myDecimal", new BsonDecimal128(new Decimal128(new BigDecimal("12345.6789"))));
194239

195-
EXPECTED_BSON_DOC_RAW_1 = EXPECTED_BSON_DOC_BYTES_1.clone();
240+
EXPECTED_BSON_DOC_OBJ_STRUCT_1 = commonMapAndStructFields.clone();
241+
EXPECTED_BSON_DOC_OBJ_MAP_1 = commonMapAndStructFields.clone();
242+
243+
EXPECTED_BSON_DOC_RAW_1 = commonMapAndStructFields.clone();
196244
EXPECTED_BSON_DOC_RAW_1.replace("myBytes",new BsonString("S2Fma2Egcm9ja3Mh"));
197245
EXPECTED_BSON_DOC_RAW_1.replace("myDate",new BsonInt64(1489705200000L));
198246
EXPECTED_BSON_DOC_RAW_1.replace("myTimestamp",new BsonInt64(1489705200000L));
@@ -216,7 +264,7 @@ public void testJsonRawStringConversion() {
216264
public void testAvroOrJsonWithSchemaConversion() {
217265
RecordConverter converter = new AvroJsonSchemafulRecordConverter();
218266
assertAll("",
219-
() -> assertEquals(EXPECTED_BSON_DOC_BYTES_1, converter.convert(OBJ_SCHEMA_1, OBJ_STRUCT_1)),
267+
() -> assertEquals(EXPECTED_BSON_DOC_OBJ_STRUCT_1, converter.convert(OBJ_SCHEMA_1, OBJ_STRUCT_1)),
220268
() -> assertThrows(DataException.class, () -> converter.convert(OBJ_SCHEMA_1,null)),
221269
() -> assertThrows(DataException.class, () -> converter.convert(null, OBJ_STRUCT_1)),
222270
() -> assertThrows(DataException.class, () -> converter.convert(null,null))
@@ -228,7 +276,7 @@ public void testAvroOrJsonWithSchemaConversion() {
228276
public void testJsonObjectConversion() {
229277
RecordConverter converter = new JsonSchemalessRecordConverter();
230278
assertAll("",
231-
() -> assertEquals(EXPECTED_BSON_DOC_BYTES_1, converter.convert(null, OBJ_MAP_1)),
279+
() -> assertEquals(EXPECTED_BSON_DOC_OBJ_MAP_1, converter.convert(null, OBJ_MAP_1)),
232280
() -> assertThrows(DataException.class, () -> converter.convert(null,null))
233281
);
234282
}

0 commit comments

Comments
 (0)