Skip to content

Commit 729f453

Browse files
add support for maps with list values
1 parent 66c3405 commit 729f453

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

src/main/java/at/grahsl/kafka/connect/mongodb/converter/AvroJsonSchemafulRecordConverter.java

+34-21
Original file line numberDiff line numberDiff line change
@@ -113,55 +113,68 @@ private void processField(BsonDocument doc, Struct struct, Field field) {
113113
case INT64:
114114
case STRING:
115115
case BYTES:
116-
handlePrimitiveField(doc, struct, field);
116+
handlePrimitiveField(doc, struct.get(field), field);
117117
break;
118118
case STRUCT:
119-
handleStructField(doc, struct, field);
119+
handleStructField(doc, (Struct)struct.get(field), field);
120120
break;
121121
case ARRAY:
122-
handleArrayField(doc, struct, field);
122+
handleArrayField(doc, (List)struct.get(field), field);
123123
break;
124124
case MAP:
125-
handleMapField(doc, struct, field);
125+
handleMapField(doc, (Map)struct.get(field), field);
126126
break;
127127
default:
128+
logger.error("Invalid schema. unexpected / unsupported schema type '"
129+
+ field.schema().type() + "' for field '"
130+
+ field.name() + "' value='" + struct + "'");
128131
throw new DataException("unexpected / unsupported schema type " + field.schema().type());
129132
}
130133
} catch (Exception exc) {
134+
logger.error("Error while processing field. schema type '"
135+
+ field.schema().type() + "' for field '"
136+
+ field.name() + "' value='" + struct + "'");
131137
throw new DataException("error while processing field " + field.name(), exc);
132138
}
133139

134140
}
135141

136-
private void handleMapField(BsonDocument doc, Struct struct, Field field) {
142+
private void handleMapField(BsonDocument doc, Map m, Field field) {
137143
logger.trace("handling complex type 'map'");
138-
BsonDocument bd = new BsonDocument();
139-
if(struct.get(field)==null) {
144+
if(m==null) {
140145
logger.trace("no field in struct -> adding null");
141146
doc.put(field.name(), BsonNull.VALUE);
142147
return;
143148
}
144-
Map m = (Map)struct.get(field);
149+
BsonDocument bd = new BsonDocument();
145150
for(Object entry : m.keySet()) {
146151
String key = (String)entry;
147-
if(field.schema().valueSchema().type().isPrimitive()) {
152+
Schema.Type valueSchemaType = field.schema().valueSchema().type();
153+
if(valueSchemaType.isPrimitive()) {
148154
bd.put(key, getConverter(field.schema().valueSchema()).toBson(m.get(key),field.schema()));
155+
} else if (valueSchemaType.equals(Schema.Type.ARRAY)) {
156+
final Field elementField = new Field(key, 0, field.schema().valueSchema());
157+
final List list = (List)m.get(key);
158+
logger.trace("adding array values to {} of type valueSchema={} value='{}'",
159+
elementField.name(), elementField.schema().valueSchema(), list);
160+
handleArrayField(bd, list, elementField);
149161
} else {
150162
bd.put(key, toBsonDoc(field.schema().valueSchema(), m.get(key)));
151163
}
152164
}
153165
doc.put(field.name(), bd);
154166
}
155167

156-
private void handleArrayField(BsonDocument doc, Struct struct, Field field) {
157-
logger.trace("handling complex type 'array'");
158-
BsonArray array = new BsonArray();
159-
if(struct.get(field)==null) {
160-
logger.trace("no field in struct -> adding null");
168+
private void handleArrayField(BsonDocument doc, List list, Field field) {
169+
logger.trace("handling complex type 'array' of types '{}'",
170+
field.schema().valueSchema().type());
171+
if(list==null) {
172+
logger.trace("no array -> adding null");
161173
doc.put(field.name(), BsonNull.VALUE);
162174
return;
163175
}
164-
for(Object element : (List)struct.get(field)) {
176+
BsonArray array = new BsonArray();
177+
for(Object element : list) {
165178
if(field.schema().valueSchema().type().isPrimitive()) {
166179
array.add(getConverter(field.schema().valueSchema()).toBson(element,field.schema()));
167180
} else {
@@ -173,18 +186,18 @@ private void handleArrayField(BsonDocument doc, Struct struct, Field field) {
173186

174187
private void handleStructField(BsonDocument doc, Struct struct, Field field) {
175188
logger.trace("handling complex type 'struct'");
176-
if(struct.get(field)!=null) {
177-
logger.trace(struct.get(field).toString());
178-
doc.put(field.name(), toBsonDoc(field.schema(), struct.get(field)));
189+
if(struct!=null) {
190+
logger.trace(struct.toString());
191+
doc.put(field.name(), toBsonDoc(field.schema(), struct));
179192
} else {
180193
logger.trace("no field in struct -> adding null");
181194
doc.put(field.name(), BsonNull.VALUE);
182195
}
183196
}
184197

185-
private void handlePrimitiveField(BsonDocument doc, Struct struct, Field field) {
186-
logger.trace("handling primitive type '{}'",field.schema().type());
187-
doc.put(field.name(), getConverter(field.schema()).toBson(struct.get(field),field.schema()));
198+
private void handlePrimitiveField(BsonDocument doc, Object value, Field field) {
199+
logger.trace("handling primitive type '{}' name='{}'",field.schema().type(),field.name());
200+
doc.put(field.name(), getConverter(field.schema()).toBson(value,field.schema()));
188201
}
189202

190203
private boolean isSupportedLogicalType(Schema schema) {

0 commit comments

Comments
 (0)