Skip to content

Add test case for new fields appearing in data #15372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,57 @@ public void testEntrypointEnvVar() throws Exception {
assertFalse(entrypoint.isBlank());
}

/**
* Verify that destination doesn't fail if new fields arrive in the data after initial schema
* discovery and sync.
*
* @throws Exception
*/
@Test
public void testSyncNotFailsWithNewFields() throws Exception {
if (!implementsOverwrite()) {
LOGGER.info("Destination's spec.json does not support overwrite sync mode.");
return;
}

final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false);
final var stream = catalog.getStreams().get(0);

// Run second sync with new fields on the message
final List<AirbyteMessage> secondSyncMessagesWithNewFields = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(stream.getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-03-31T00:00:00Z")
.put("newFieldString", "Value for new field")
.put("newFieldNumber", 3)
.put("HKD", 10.1)
.put("NZD", 700.1)
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

// Run sync and verify that all message were written without failing
runSyncAndVerifyStateOutput(config, secondSyncMessagesWithNewFields, configuredCatalog, false);
var destinationOutput = retrieveRecords(testEnv, stream.getName(), getDefaultSchema(config), stream.getJsonSchema());
// Remove state message
secondSyncMessagesWithNewFields.removeIf(airbyteMessage -> airbyteMessage.getType().equals(Type.STATE));
assertEquals(secondSyncMessagesWithNewFields.size(), destinationOutput.size());
}

/**
* Whether the destination should be tested against different namespaces.
*/
Expand Down