Skip to content

Commit 0a027a1

Browse files
🐛 Fixed test for gcs, csv, json-local, mongodb and meilisearch destination (#6134)
* updated gcs, csv and localJson tests * added emitted_at to MeiliSearchDestination * updated testSyncWithLargeRecordBatch test * added comment to meiliSerch destination test * fixed remarks
1 parent 13e8be5 commit 0a027a1

File tree

9 files changed

+23
-8
lines changed

9 files changed

+23
-8
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/af7c921e-5892-4ff2-b6c1-4a5ab258fb7e.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
33
"name": "MeiliSearch",
44
"dockerRepository": "airbyte/destination-meilisearch",
5-
"dockerImageTag": "0.2.9",
5+
"dockerImageTag": "0.2.10",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
77
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
5959
name: MeiliSearch
6060
dockerRepository: airbyte/destination-meilisearch
61-
dockerImageTag: 0.2.9
61+
dockerImageTag: 0.2.10
6262
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
6363
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
6464
name: MySQL

airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,9 @@ public void testSyncWithLargeRecordBatch(String messagesFilename, String catalog
383383
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
384384
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
385385

386-
final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(1000, messages).stream().flatMap(List::stream).collect(Collectors.toList());
386+
final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(400, messages).stream().flatMap(List::stream).collect(Collectors.toList());
387387

388388
final JsonNode config = getConfig();
389-
final String defaultSchema = getDefaultSchema(config);
390389
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
391390
}
392391

airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationAcceptanceTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
8080
JsonNode streamSchema)
8181
throws Exception {
8282
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
83+
8384
final Optional<Path> streamOutput =
84-
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
85+
allOutputs.stream()
86+
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".csv"))
8587
.findFirst();
8688

8789
assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);

airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.slf4j.Logger;
4848
import org.slf4j.LoggerFactory;
4949

50+
import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER;
51+
5052
/**
5153
* When adding a new GCS destination acceptance test, extend this class and do the following:
5254
* <li>Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}</li>
@@ -107,6 +109,7 @@ protected List<S3ObjectSummary> getAllSyncedObjects(String streamName, String na
107109
.listObjects(config.getBucketName(), outputPrefix)
108110
.getObjectSummaries()
109111
.stream()
112+
.filter(o -> o.getKey().contains(NAME_TRANSFORMER.convertStreamName(streamName) + "/"))
110113
.sorted(Comparator.comparingLong(o -> o.getLastModified().getTime()))
111114
.collect(Collectors.toList());
112115
LOGGER.info(

airbyte-integrations/connectors/destination-local-json/src/test-integration/java/io/airbyte/integrations/destination/local_json/LocalJsonDestinationAcceptanceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
7777
throws Exception {
7878
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
7979
final Optional<Path> streamOutput = allOutputs.stream()
80-
.filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
80+
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".jsonl"))
8181
.findFirst();
8282

8383
assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);

airbyte-integrations/connectors/destination-meilisearch/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.2.9
11+
LABEL io.airbyte.version=0.2.10
1212
LABEL io.airbyte.name=airbyte/destination-meilisearch

airbyte-integrations/connectors/destination-meilisearch/src/main/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchDestination.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@
4545
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
4646
import io.airbyte.protocol.models.DestinationSyncMode;
4747
import java.time.Instant;
48+
import java.time.LocalDateTime;
49+
import java.time.format.DateTimeFormatter;
4850
import java.util.Arrays;
4951
import java.util.HashMap;
5052
import java.util.Map;
5153
import java.util.UUID;
5254
import java.util.function.Consumer;
5355
import java.util.stream.Collectors;
56+
5457
import org.slf4j.Logger;
5558
import org.slf4j.LoggerFactory;
5659

@@ -83,8 +86,10 @@ public class MeiliSearchDestination extends BaseConnector implements Destination
8386
private static final Logger LOGGER = LoggerFactory.getLogger(MeiliSearchDestination.class);
8487

8588
private static final int MAX_BATCH_SIZE = 10000;
89+
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS");
8690

8791
public static final String AB_PK_COLUMN = "_ab_pk";
92+
public static final String AB_EMITTED_AT_COLUMN = "_ab_emitted_at";
8893

8994
@Override
9095
public AirbyteConnectionStatus check(JsonNode config) {
@@ -164,6 +169,7 @@ private static RecordWriter recordWriterFunction(final Map<String, Index> indexN
164169
.stream()
165170
.map(AirbyteRecordMessage::getData)
166171
.peek(o -> ((ObjectNode) o).put(AB_PK_COLUMN, Names.toAlphanumericAndUnderscore(UUID.randomUUID().toString())))
172+
.peek(o -> ((ObjectNode) o).put(AB_EMITTED_AT_COLUMN, LocalDateTime.now().format(FORMATTER)))
167173
.collect(Collectors.toList()));
168174
final String s = index.addDocuments(json);
169175
LOGGER.info("add docs response {}", s);

airbyte-integrations/connectors/destination-meilisearch/src/test-integration/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchDestinationAcceptanceTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.IOException;
3838
import java.nio.file.Files;
3939
import java.nio.file.Path;
40+
import java.util.Comparator;
4041
import java.util.List;
4142
import java.util.stream.Collectors;
4243
import org.testcontainers.containers.GenericContainer;
@@ -101,10 +102,14 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
101102
final Index index = meiliSearchClient.index(Names.toAlphanumericAndUnderscore(streamName));
102103
final String responseString = index.getDocuments();
103104
final JsonNode response = Jsons.deserialize(responseString);
104-
return MoreStreams.toStream(response.iterator())
105+
return MoreStreams.toStream(response.iterator())
105106
// strip out the airbyte primary key because the test cases only expect the data, no the airbyte
106107
// metadata column.
108+
// We also sort the data by "emitted_at" and then remove that column, because the test cases only expect data,
109+
// not the airbyte metadata column.
107110
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_PK_COLUMN))
111+
.sorted(Comparator.comparing(o -> o.get(MeiliSearchDestination.AB_EMITTED_AT_COLUMN).asText()))
112+
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_EMITTED_AT_COLUMN))
108113
.collect(Collectors.toList());
109114
}
110115

0 commit comments

Comments
 (0)