Skip to content

add support for maps with list values #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,55 +113,68 @@ private void processField(BsonDocument doc, Struct struct, Field field) {
case INT64:
case STRING:
case BYTES:
handlePrimitiveField(doc, struct, field);
handlePrimitiveField(doc, struct.get(field), field);
break;
case STRUCT:
handleStructField(doc, struct, field);
handleStructField(doc, (Struct)struct.get(field), field);
break;
case ARRAY:
handleArrayField(doc, struct, field);
handleArrayField(doc, (List)struct.get(field), field);
break;
case MAP:
handleMapField(doc, struct, field);
handleMapField(doc, (Map)struct.get(field), field);
break;
default:
logger.error("Invalid schema. unexpected / unsupported schema type '"
+ field.schema().type() + "' for field '"
+ field.name() + "' value='" + struct + "'");
throw new DataException("unexpected / unsupported schema type " + field.schema().type());
}
} catch (Exception exc) {
logger.error("Error while processing field. schema type '"
+ field.schema().type() + "' for field '"
+ field.name() + "' value='" + struct + "'");
throw new DataException("error while processing field " + field.name(), exc);
}

}

private void handleMapField(BsonDocument doc, Struct struct, Field field) {
private void handleMapField(BsonDocument doc, Map m, Field field) {
logger.trace("handling complex type 'map'");
BsonDocument bd = new BsonDocument();
if(struct.get(field)==null) {
if(m==null) {
logger.trace("no field in struct -> adding null");
doc.put(field.name(), BsonNull.VALUE);
return;
}
Map m = (Map)struct.get(field);
BsonDocument bd = new BsonDocument();
for(Object entry : m.keySet()) {
String key = (String)entry;
if(field.schema().valueSchema().type().isPrimitive()) {
Schema.Type valueSchemaType = field.schema().valueSchema().type();
if(valueSchemaType.isPrimitive()) {
bd.put(key, getConverter(field.schema().valueSchema()).toBson(m.get(key),field.schema()));
} else if (valueSchemaType.equals(Schema.Type.ARRAY)) {
final Field elementField = new Field(key, 0, field.schema().valueSchema());
final List list = (List)m.get(key);
logger.trace("adding array values to {} of type valueSchema={} value='{}'",
elementField.name(), elementField.schema().valueSchema(), list);
handleArrayField(bd, list, elementField);
} else {
bd.put(key, toBsonDoc(field.schema().valueSchema(), m.get(key)));
}
}
doc.put(field.name(), bd);
}

private void handleArrayField(BsonDocument doc, Struct struct, Field field) {
logger.trace("handling complex type 'array'");
BsonArray array = new BsonArray();
if(struct.get(field)==null) {
logger.trace("no field in struct -> adding null");
private void handleArrayField(BsonDocument doc, List list, Field field) {
logger.trace("handling complex type 'array' of types '{}'",
field.schema().valueSchema().type());
if(list==null) {
logger.trace("no array -> adding null");
doc.put(field.name(), BsonNull.VALUE);
return;
}
for(Object element : (List)struct.get(field)) {
BsonArray array = new BsonArray();
for(Object element : list) {
if(field.schema().valueSchema().type().isPrimitive()) {
array.add(getConverter(field.schema().valueSchema()).toBson(element,field.schema()));
} else {
Expand All @@ -173,18 +186,18 @@ private void handleArrayField(BsonDocument doc, Struct struct, Field field) {

private void handleStructField(BsonDocument doc, Struct struct, Field field) {
logger.trace("handling complex type 'struct'");
if(struct.get(field)!=null) {
logger.trace(struct.get(field).toString());
doc.put(field.name(), toBsonDoc(field.schema(), struct.get(field)));
if(struct!=null) {
logger.trace(struct.toString());
doc.put(field.name(), toBsonDoc(field.schema(), struct));
} else {
logger.trace("no field in struct -> adding null");
doc.put(field.name(), BsonNull.VALUE);
}
}

private void handlePrimitiveField(BsonDocument doc, Struct struct, Field field) {
logger.trace("handling primitive type '{}'",field.schema().type());
doc.put(field.name(), getConverter(field.schema()).toBson(struct.get(field),field.schema()));
private void handlePrimitiveField(BsonDocument doc, Object value, Field field) {
logger.trace("handling primitive type '{}' name='{}'",field.schema().type(),field.name());
doc.put(field.name(), getConverter(field.schema()).toBson(value,field.schema()));
}

private boolean isSupportedLogicalType(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -45,7 +46,8 @@ public class RecordConverterTest {
public static Schema OBJ_SCHEMA_1;
public static Struct OBJ_STRUCT_1;
public static Map OBJ_MAP_1;
public static BsonDocument EXPECTED_BSON_DOC_BYTES_1;
public static BsonDocument EXPECTED_BSON_DOC_OBJ_STRUCT_1;
public static BsonDocument EXPECTED_BSON_DOC_OBJ_MAP_1;
public static BsonDocument EXPECTED_BSON_DOC_RAW_1;

@BeforeAll
Expand Down Expand Up @@ -82,6 +84,8 @@ public static void initializeTestData() {
.build())
)
.field("mySubDoc2", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
.field( "myMapOfStrings", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.STRING_SCHEMA).build()).build())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi again. sorry it took me some time to get back on this...
I'm really looking forward to merging this. Let me suggest one more small aspect which is that I think for consistency reasons of this test we should also add the two maps to the JSON_STRING_1 above at right before the "myBytes" field

see 0c0ad67#diff-ab414b445c07f6db6f112afe2ca89aafL63

could you maybe please do it directly in your branch? then it's the least effort I guess to get it merged :)

.field( "myMapOfInts", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.INT32_SCHEMA).build()).build())
.field("myBytes", Schema.BYTES_SCHEMA)
.field("myDate", Date.SCHEMA)
.field("myTimestamp", Timestamp.SCHEMA)
Expand All @@ -108,6 +112,16 @@ public static void initializeTestData() {
)
)
.put("mySubDoc2",new HashMap<String,Integer>(){{ put("k1",9); put("k2",8); put("k3",7);}})
.put("myMapOfStrings", new HashMap<String, List<String>>(){{
put("k1", Arrays.asList("v1-a", "v1-b"));
put("k2", Arrays.asList("v2-a"));
put("k3", Arrays.asList("v3-a", "v3-b", "v3-c"));
}})
.put("myMapOfInts", new HashMap<String, List<Integer>>(){{
put("k1", Arrays.asList(11, 12));
put("k2", Arrays.asList(21));
put("k3", Arrays.asList(31, 32, 33));
}})
.put("myBytes", new byte[]{75, 97, 102, 107, 97, 32, 114, 111, 99, 107, 115, 33})
.put("myDate", java.util.Date.from(ZonedDateTime.of(
LocalDate.of(2017,3,17), LocalTime.MIDNIGHT, ZoneOffset.systemDefault()
Expand Down Expand Up @@ -154,7 +168,7 @@ public static void initializeTestData() {
// thus I'm cheating a little by using a Decimal128 here...
OBJ_MAP_1.put("myDecimal", Decimal128.parse("12345.6789"));

EXPECTED_BSON_DOC_BYTES_1 = new BsonDocument()
BsonDocument commonMapAndStructFields = new BsonDocument()
.append("_id", new BsonString("1234567890"))
.append("myString", new BsonString("some foo bla text"))
.append("myInt", new BsonInt32(42))
Expand Down Expand Up @@ -192,7 +206,32 @@ public static void initializeTestData() {
))
.append("myDecimal", new BsonDecimal128(new Decimal128(new BigDecimal("12345.6789"))));

EXPECTED_BSON_DOC_RAW_1 = EXPECTED_BSON_DOC_BYTES_1.clone();
EXPECTED_BSON_DOC_OBJ_STRUCT_1 = commonMapAndStructFields.clone()
.append("myMapOfStrings", new BsonDocument("k1", new BsonInt32(9))
.append("k1", new BsonArray(Arrays.asList(
new BsonString("v1-a"),
new BsonString("v1-b"))))
.append("k2", new BsonArray(Arrays.asList(
new BsonString("v2-a"))))
.append("k3", new BsonArray(Arrays.asList(
new BsonString("v3-a"),
new BsonString("v3-b"),
new BsonString("v3-c"))))
).append("myMapOfInts", new BsonDocument("k1", new BsonInt32(9))
.append("k1", new BsonArray(Arrays.asList(
new BsonInt32(11),
new BsonInt32(12))))
.append("k2", new BsonArray(Arrays.asList(
new BsonInt32(21))))
.append("k3", new BsonArray(Arrays.asList(
new BsonInt32(31),
new BsonInt32(32),
new BsonInt32(33))))
);

EXPECTED_BSON_DOC_OBJ_MAP_1 = commonMapAndStructFields.clone();

EXPECTED_BSON_DOC_RAW_1 = commonMapAndStructFields.clone();
EXPECTED_BSON_DOC_RAW_1.replace("myBytes",new BsonString("S2Fma2Egcm9ja3Mh"));
EXPECTED_BSON_DOC_RAW_1.replace("myDate",new BsonInt64(1489705200000L));
EXPECTED_BSON_DOC_RAW_1.replace("myTimestamp",new BsonInt64(1489705200000L));
Expand All @@ -216,7 +255,7 @@ public void testJsonRawStringConversion() {
public void testAvroOrJsonWithSchemaConversion() {
RecordConverter converter = new AvroJsonSchemafulRecordConverter();
assertAll("",
() -> assertEquals(EXPECTED_BSON_DOC_BYTES_1, converter.convert(OBJ_SCHEMA_1, OBJ_STRUCT_1)),
() -> assertEquals(EXPECTED_BSON_DOC_OBJ_STRUCT_1, converter.convert(OBJ_SCHEMA_1, OBJ_STRUCT_1)),
() -> assertThrows(DataException.class, () -> converter.convert(OBJ_SCHEMA_1,null)),
() -> assertThrows(DataException.class, () -> converter.convert(null, OBJ_STRUCT_1)),
() -> assertThrows(DataException.class, () -> converter.convert(null,null))
Expand All @@ -228,7 +267,7 @@ public void testAvroOrJsonWithSchemaConversion() {
public void testJsonObjectConversion() {
RecordConverter converter = new JsonSchemalessRecordConverter();
assertAll("",
() -> assertEquals(EXPECTED_BSON_DOC_BYTES_1, converter.convert(null, OBJ_MAP_1)),
() -> assertEquals(EXPECTED_BSON_DOC_OBJ_MAP_1, converter.convert(null, OBJ_MAP_1)),
() -> assertThrows(DataException.class, () -> converter.convert(null,null))
);
}
Expand Down