Skip to content

fixing a bug related to collection-unaware/topic-agnostic CDC config #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>at.grahsl.kafka.connect</groupId>
<artifactId>kafka-connect-mongodb</artifactId>
<version>1.3.1</version>
<version>1.3.2-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-connect-mongodb</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ public Map<String,CdcHandler> getCdcHandlers() {

Map<String, CdcHandler> cdcHandlers = new HashMap<>();

if(isUsingCdcHandler("")) {
cdcHandlers.put(TOPIC_AGNOSTIC_KEY_NAME,getCdcHandler(""));
}

splitAndTrimAndRemoveConfigListEntries(getString(MONGODB_COLLECTIONS_CONF))
.forEach(collection -> {
CdcHandler candidate = cdcHandlers.put(collection,getCdcHandler(collection));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ Map<String, MongoDbSinkRecordBatches> createSinkRecordBatchesPerTopic(Collection
LOGGER.debug("building CDC write model for {} record(s) into collection {}", records.size(), collectionName);
return records.stream()
.map(sinkConverter::convert)
.map(cdcHandlers.get(collectionName)::handle)
.map(cdcHandlers.getOrDefault(collectionName,
cdcHandlers.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME))::handle)
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler;
import at.grahsl.kafka.connect.mongodb.processor.*;
Expand Down Expand Up @@ -615,7 +616,7 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS

List<DynamicTest> tests = new ArrayList<>();

Map<String, Class> canditates = new HashMap<String, Class>() {{
Map<String, Class> candidates = new HashMap<String, Class>() {{
put("", ReplaceOneDefaultStrategy.class);
put("collection-1", ReplaceOneDefaultStrategy.class);
put("collection-2", ReplaceOneBusinessKeyStrategy.class);
Expand All @@ -624,10 +625,10 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS
}};

HashMap<String,String> map = new HashMap<>();
canditates.entrySet().forEach(entry ->
candidates.entrySet().forEach(entry ->
map.put(MONGODB_WRITEMODEL_STRATEGY+"."+entry.getKey(),entry.getValue().getName())
);
map.put(MONGODB_COLLECTIONS_CONF, canditates.keySet().stream()
map.put(MONGODB_COLLECTIONS_CONF, candidates.keySet().stream()
.collect(Collectors.joining(FIELD_LIST_SPLIT_CHAR)));

MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Expand All @@ -640,9 +641,9 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS
: "check write model strategy for config "+MONGODB_WRITEMODEL_STRATEGY +"."+entry.getKey(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(entry.getValue(), "write model strategy was null"),
() -> assertTrue(canditates.get(TOPIC_AGNOSTIC_KEY_NAME.equals(entry.getKey())
() -> assertTrue(candidates.get(TOPIC_AGNOSTIC_KEY_NAME.equals(entry.getKey())
? "" : entry.getKey()).isInstance(entry.getValue()),
"write model strategy NOT of type " + canditates.get(entry.getKey()))
"write model strategy NOT of type " + candidates.get(entry.getKey()))
))
)
);
Expand All @@ -658,6 +659,7 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {

HashMap<String,Class> candidates = new HashMap<String,Class>() {{
put(MongoDbHandler.class.getName(),MongoDbHandler.class);
put(RdbmsHandler.class.getName(), RdbmsHandler.class);
put(MysqlHandler.class.getName(),MysqlHandler.class);
put(PostgresHandler.class.getName(),PostgresHandler.class);
}};
Expand All @@ -666,8 +668,8 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {
HashMap<String,String> map = new HashMap<>();
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER,entry.getKey());
MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
CdcHandler cdc = cfg.getCdcHandler();
tests.add(dynamicTest("check cdc handler for config"
CdcHandler cdc = cfg.getCdcHandler("");
tests.add(dynamicTest("check cdc handler for config "
+ MONGODB_CHANGE_DATA_CAPTURE_HANDLER + "="+entry.getKey(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(cdc, "cdc handler was null"),
Expand All @@ -677,9 +679,9 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {
));
});

tests.add(dynamicTest("check cdc handler for config"
tests.add(dynamicTest("check cdc handler for config "
+ MONGODB_CHANGE_DATA_CAPTURE_HANDLER + "=",
() -> assertNull(new MongoDbSinkConnectorConfig(new HashMap<>()).getCdcHandler(),
() -> assertNull(new MongoDbSinkConnectorConfig(new HashMap<>()).getCdcHandler(""),
"cdc handler was not null")
)
);
Expand All @@ -693,34 +695,70 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidCdcHandlers

List<DynamicTest> tests = new ArrayList<>();

Map<String, Class> canditates = new HashMap<String, Class>() {{
Map<String, Class> candidates = new HashMap<String, Class>() {{
put("collection-1", MongoDbHandler.class);
put("collection-2", MysqlHandler.class);
put("collection-3", PostgresHandler.class);
put("collection-2", RdbmsHandler.class);
put("collection-3", MysqlHandler.class);
put("collection-4", PostgresHandler.class);
}};

HashMap<String,String> map = new HashMap<>();
canditates.entrySet().forEach(entry ->
candidates.entrySet().forEach(entry ->
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER+"."+entry.getKey(),entry.getValue().getName())
);
map.put(MONGODB_COLLECTIONS_CONF, canditates.keySet().stream()
map.put(MONGODB_COLLECTIONS_CONF, candidates.keySet().stream()
.collect(Collectors.joining(FIELD_LIST_SPLIT_CHAR)));

MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Map<String, CdcHandler> cdc = cfg.getCdcHandlers();

cdc.entrySet().forEach(entry ->
tests.add(dynamicTest("check cdc handler for config " +
MONGODB_CHANGE_DATA_CAPTURE_HANDLER +"."+entry.getKey(),
MONGODB_CHANGE_DATA_CAPTURE_HANDLER +"."+entry.getKey()
+"="+entry.getValue().getClass().getName(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(entry.getValue(), "cdc handler was null"),
() -> assertTrue(canditates.get(entry.getKey()).isInstance(entry.getValue()),
"cdc handler NOT of type " + canditates.get(entry.getKey()))
() -> assertTrue(candidates.get(entry.getKey()).isInstance(entry.getValue()),
"cdc handler NOT of type " + candidates.get(entry.getKey()))
))
)
);

return tests;
}

@TestFactory
@DisplayName("test collection unspecific CDC handlers")
public Collection<DynamicTest> testCollectionUnspecificCdcHandlers() {

List<DynamicTest> tests = new ArrayList<>();

Set<Class> candidates = new HashSet<Class>() {{
add(MongoDbHandler.class);
add(RdbmsHandler.class);
add(MysqlHandler.class);
add(PostgresHandler.class);
}};

candidates.forEach(entry -> {

HashMap<String,String> map = new HashMap<>();
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER,entry.getName());
map.put(MONGODB_COLLECTION_CONF,"whatever");
MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Map<String, CdcHandler> cdc = cfg.getCdcHandlers();
CdcHandler which = cdc.getOrDefault(cfg.getString(MONGODB_COLLECTION_CONF),cdc.get(TOPIC_AGNOSTIC_KEY_NAME));
tests.add(dynamicTest("check cdc handler for config " +
MONGODB_CHANGE_DATA_CAPTURE_HANDLER+"="+entry.getName(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(which, "cdc handler was null"),
() -> assertTrue(entry.isInstance(which),
"cdc handler NOT of type " + entry)
)));
});

return tests;

}

}