From cf4d0076e9b3e13d8710676ac07733683f5bcf50 Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Wed, 18 Aug 2021 17:53:12 -0300 Subject: [PATCH 1/4] enable additioanl params spec.json --- .../253487c0-2246-43ba-a21f-5116b20a2c50.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-integrations/bases/base-python/base_python/__init__.py | 1 - airbyte-integrations/connectors/source-google-ads/Dockerfile | 2 +- .../connectors/source-google-ads/source_google_ads/spec.json | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/253487c0-2246-43ba-a21f-5116b20a2c50.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/253487c0-2246-43ba-a21f-5116b20a2c50.json index 948977c56cf0c..eb281863af337 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/253487c0-2246-43ba-a21f-5116b20a2c50.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/253487c0-2246-43ba-a21f-5116b20a2c50.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "253487c0-2246-43ba-a21f-5116b20a2c50", "name": "Google Ads", "dockerRepository": "airbyte/source-google-ads", - "dockerImageTag": "0.1.7", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/sources/google-ads" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 0ed64cfd25002..caf2857964f6b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -28,7 +28,7 @@ - sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50 name: Google Ads dockerRepository: airbyte/source-google-ads - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/sources/google-ads - sourceDefinitionId: fdc8b827-3257-4b33-83cc-106d234c34d4 name: Google Adwords (Deprecated) diff --git a/airbyte-integrations/bases/base-python/base_python/__init__.py b/airbyte-integrations/bases/base-python/base_python/__init__.py index 4ac13a760af6e..355f36b6dea52 100644 --- a/airbyte-integrations/bases/base-python/base_python/__init__.py +++ b/airbyte-integrations/bases/base-python/base_python/__init__.py @@ -25,7 +25,6 @@ from base_python.catalog_helpers import CatalogHelper from base_python.cdk.abstract_source import AbstractSource - # Separate the SDK imports so they can be moved somewhere else more easily from base_python.cdk.streams.auth.core import HttpAuthenticator from base_python.cdk.streams.auth.oauth import Oauth2Authenticator diff --git a/airbyte-integrations/connectors/source-google-ads/Dockerfile b/airbyte-integrations/connectors/source-google-ads/Dockerfile index 27eb03d82c966..4c4431372a619 100644 --- a/airbyte-integrations/connectors/source-google-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-google-ads/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/source-google-ads diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json index 212f9e59b32f7..994f4206c8cf2 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json @@ -5,7 +5,7 @@ "title": "Google Ads Spec", "type": "object", "required": ["credentials", "start_date", "customer_id"], - "additionalProperties": false, + "additionalProperties": true, "properties": { "credentials": { "type": "object", From e4a21fffcdb2df3d95157f5bd297c7be53577ad1 Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Wed, 18 Aug 2021 19:06:26 -0300 Subject: [PATCH 2/4] removed duplicated tests --- .../bases/base-python/base_python/__init__.py | 1 + .../connectors/source-google-ads/setup.py | 5 +---- .../unit_tests/test_google_ads.py | 19 ++----------------- 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/bases/base-python/base_python/__init__.py b/airbyte-integrations/bases/base-python/base_python/__init__.py index 355f36b6dea52..4ac13a760af6e 100644 --- a/airbyte-integrations/bases/base-python/base_python/__init__.py +++ b/airbyte-integrations/bases/base-python/base_python/__init__.py @@ -25,6 +25,7 @@ from base_python.catalog_helpers import CatalogHelper from base_python.cdk.abstract_source import AbstractSource + # Separate the SDK imports so they can be moved somewhere else more easily from base_python.cdk.streams.auth.core import HttpAuthenticator from base_python.cdk.streams.auth.oauth import Oauth2Authenticator diff --git a/airbyte-integrations/connectors/source-google-ads/setup.py b/airbyte-integrations/connectors/source-google-ads/setup.py index 28c9d13dde0a0..9fa2efcf1a459 100644 --- a/airbyte-integrations/connectors/source-google-ads/setup.py +++ b/airbyte-integrations/connectors/source-google-ads/setup.py @@ -27,10 +27,7 @@ MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads", "pendulum"] -TEST_REQUIREMENTS = [ - "pytest~=6.1", - "pytest-mock", -] +TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "pendulum"] setup( name="source_google_ads", diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py index 401e2ac5be404..d271c266695a5 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py @@ -23,15 +23,11 @@ # from datetime import date -import pendulum import pendulum from source_google_ads.google_ads import GoogleAds from source_google_ads.streams import IncrementalGoogleAdsStream, chunk_date_range -from source_google_ads.streams import chunk_date_range -from source_google_ads.streams import IncrementalGoogleAdsStream - SAMPLE_SCHEMA = { "properties": { "segment.date": { @@ -121,18 +117,6 @@ def test_get_fields_from_schema(): response = GoogleAds.get_fields_from_schema(SAMPLE_SCHEMA) assert response == ["segment.date"] -def test_interval_chunking(): - mock_intervals = [{'segments.date': '2021-05-18'}, {'segments.date': '2021-06-18'}, {'segments.date': '2021-07-18'}] - intervals = chunk_date_range('2021-06-01', 14, 'segments.date', '2021-08-15') - - assert mock_intervals == intervals - -def test_get_date_params(): - mock_start_date = '2021-05-19' # Please note that this is equal to inputted stream_slice start date + 1 day - mock_end_date = '2021-06-18' - start_date, end_date = IncrementalGoogleAdsStream.get_date_params(stream_slice={'segments.date': '2021-05-18'}, cursor_field='segments.date', end_date=pendulum.parse('2021-08-15')) - - assert mock_start_date == start_date and mock_end_date == end_date def test_interval_chunking(): mock_intervals = [{"segments.date": "2021-05-18"}, {"segments.date": "2021-06-18"}, {"segments.date": "2021-07-18"}] @@ -142,7 +126,8 @@ def test_interval_chunking(): def test_get_date_params(): - mock_start_date = "2021-05-19" # Please note that this is equal to inputted stream_slice start date + 1 day + # Please note that this is equal to inputted stream_slice start date + 1 day + mock_start_date = "2021-05-19" mock_end_date = "2021-06-18" start_date, end_date = IncrementalGoogleAdsStream.get_date_params( stream_slice={"segments.date": "2021-05-18"}, cursor_field="segments.date", end_date=pendulum.parse("2021-08-15") From a3658869ee49a195b9763f5e47f80f93a5a1f829 Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Wed, 18 Aug 2021 19:26:52 -0300 Subject: [PATCH 3/4] add docs --- docs/integrations/sources/google-ads.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/google-ads.md b/docs/integrations/sources/google-ads.md index 507a61dfc4481..5a38dde6970cb 100644 --- a/docs/integrations/sources/google-ads.md +++ b/docs/integrations/sources/google-ads.md @@ -87,6 +87,7 @@ The Google Ads Query Language can query the Google Ads API. Check out [Google Ad | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| `0.1.8` | 2021-08-03 | [#5509](https://github.com/airbytehq/airbyte/pull/5509) | additionalProperties in spec.json | | `0.1.7` | 2021-08-03 | [#5422](https://github.com/airbytehq/airbyte/pull/5422) | Correct query to not skip dates | | `0.1.6` | 2021-08-03 | [#5423](https://github.com/airbytehq/airbyte/pull/5423) | Added new stream UserLocationReport | | `0.1.5` | 2021-08-03 | [#5159](https://github.com/airbytehq/airbyte/pull/5159) | Add field `login_customer_id` to spec | From e722c1f77fc84b843f32aab9b028b4be7f271cfe Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Wed, 18 Aug 2021 22:12:45 -0300 Subject: [PATCH 4/4] format keen --- .../keen/KeenCharactersStripper.java | 35 ++++- .../destination/keen/KeenDestination.java | 26 +++- .../destination/keen/KeenHttpClient.java | 43 ++++-- .../destination/keen/KeenRecordsConsumer.java | 25 +++- .../keen/KeenTimestampService.java | 65 ++++++--- .../destination/keen/KeenDestinationTest.java | 36 ++++- .../test/java/KeenTimestampServiceTest.java | 50 +++++-- .../src/test/resources/cursors_catalog.json | 4 +- .../source_chargebee/streams.py | 1 + .../schemas/user_location_report.json | 129 +++++++++--------- 10 files changed, 291 insertions(+), 123 deletions(-) diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenCharactersStripper.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenCharactersStripper.java index bb39d22a0850c..b36b97eacfdc7 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenCharactersStripper.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenCharactersStripper.java @@ -1,13 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.destination.keen; import org.apache.commons.lang3.StringUtils; public class KeenCharactersStripper { - // Keen collection names can't contain some special characters like non ascii accented characters - // while Kafka Topic names can't contain some other set of special characters, with except for -._ and whitespace characters - public static String stripSpecialCharactersFromStreamName(String streamName) { - return StringUtils.stripAccents(streamName).replaceAll("[^A-Za-z0-9 -._]",""); - } + // Keen collection names can't contain some special characters like non ascii accented characters + // while Kafka Topic names can't contain some other set of special characters, with except for -._ + // and whitespace characters + public static String stripSpecialCharactersFromStreamName(String streamName) { + return StringUtils.stripAccents(streamName).replaceAll("[^A-Za-z0-9 -._]", ""); + } } diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenDestination.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenDestination.java index c8ecdf39b49e2..6b085f7988256 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenDestination.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenDestination.java @@ -1,3 +1,26 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ package io.airbyte.integrations.destination.keen; @@ -22,7 +45,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Properties; import java.util.function.Consumer; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; @@ -62,7 +84,6 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, return new KeenRecordsConsumer(config, catalog, outputRecordCollector); } - public static void main(String[] args) throws Exception { final Destination destination = new KeenDestination(); LOGGER.info("starting destination: {}", KeenDestination.class); @@ -86,6 +107,7 @@ public static KafkaProducer create(String projectId, String apiK props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); } + } } diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java index c90bd24f0b202..e2414218ac5bc 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.destination.keen; import static io.airbyte.integrations.destination.keen.KeenDestination.KEEN_BASE_API_PATH; @@ -36,8 +60,7 @@ public void eraseStream(String streamToDelete, String projectId, String apiKey, URI deleteUri = URI.create(String.format( KEEN_BASE_API_PATH + "/projects/%s/events/%s", - projectId, streamToDelete - )); + projectId, streamToDelete)); HttpRequest request = HttpRequest.newBuilder() .uri(deleteUri) @@ -54,11 +77,9 @@ public void eraseStream(String streamToDelete, String projectId, String apiKey, LOGGER.info("Deletes limit exceeded. Sleeping 60 seconds."); Thread.sleep(MINUTE_MILLIS); eraseStream(streamToDelete, projectId, apiKey, true); - } - else { + } else { throw new IllegalStateException(String.format("Could not erase data from stream designed for overriding: " - + "%s. Error message: %s", streamToDelete, response.body() - )); + + "%s. Error message: %s", streamToDelete, response.body())); } } } @@ -68,8 +89,7 @@ public ArrayNode extract(String streamName, String projectId, String apiKey) URI extractionUri = URI.create(String.format( keenBaseApiPath + "/projects/%s/queries/extraction" + "?api_key=%s&timeframe=this_7_years&event_collection=%s", - projectId, apiKey, streamName - )); + projectId, apiKey, streamName)); HttpRequest request = HttpRequest.newBuilder() .uri(extractionUri) @@ -90,8 +110,7 @@ public List getAllCollectionsForProject(String projectId, String apiKey) throws IOException, InterruptedException { URI listCollectionsUri = URI.create(String.format( KEEN_BASE_API_PATH + "/projects/%s/events", - projectId - )); + projectId)); HttpRequest request = HttpRequest.newBuilder() .uri(listCollectionsUri) @@ -103,13 +122,14 @@ public List getAllCollectionsForProject(String projectId, String apiKey) HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); List keenCollections = objectMapper.readValue(objectMapper.createParser(response.body()), - new TypeReference<>(){}); + new TypeReference<>() {}); return keenCollections.stream().map(KeenCollection::getName).collect(Collectors.toList()); } @JsonIgnoreProperties(ignoreUnknown = true) private static class KeenCollection { + private String name; public String getName() { @@ -119,6 +139,7 @@ public String getName() { public void setName(String name) { this.name = name; } + } } diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenRecordsConsumer.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenRecordsConsumer.java index c9a4655e94314..0d1f41d68bb57 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenRecordsConsumer.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenRecordsConsumer.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.destination.keen; import static io.airbyte.integrations.destination.keen.KeenDestination.CONFIG_API_KEY; @@ -120,7 +144,6 @@ private String getStreamName(AirbyteRecordMessage recordMessage) { return streamName; } - @Override protected void close(boolean hasFailed) { kafkaProducer.flush(); diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java index d4379fba6ea7e..49276776be554 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.destination.keen; import com.fasterxml.jackson.databind.JsonNode; @@ -19,13 +43,17 @@ import org.slf4j.LoggerFactory; /** - * This class is used for timestamp inference. Keen leverages use of time-related data for it's analytics, so it's - * important to have timestamp values for historical data if possible. If stream contains cursor field, then its value is - * used as a timestamp, if parsing it is possible. + * This class is used for timestamp inference. Keen leverages use of time-related data for it's + * analytics, so it's important to have timestamp values for historical data if possible. If stream + * contains cursor field, then its value is used as a timestamp, if parsing it is possible. */ public class KeenTimestampService { - public enum CursorType {STRING, NUMBER, UNRECOGNIZED} + public enum CursorType { + STRING, + NUMBER, + UNRECOGNIZED + } private static final Logger LOGGER = LoggerFactory.getLogger(KeenRecordsConsumer.class); @@ -56,11 +84,13 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI } /** - * Tries to inject keen.timestamp field to the given message data. If the stream contains - * cursor fields of types NUMBER or STRING, this value is tried to be parsed to a timestamp. - * If this procedure fails, stream is removed from timestamp-parsable stream map, so parsing is not tried - * for future messages in the same stream. If parsing succeeds, keen.timestamp field is put as a JSON node - * to the message data and whole data is returned. Otherwise, keen.timestamp is set to emittedAt value + * Tries to inject keen.timestamp field to the given message data. If the stream contains cursor + * fields of types NUMBER or STRING, this value is tried to be parsed to a timestamp. If this + * procedure fails, stream is removed from timestamp-parsable stream map, so parsing is not tried + * for future messages in the same stream. If parsing succeeds, keen.timestamp field is put as a + * JSON node to the message data and whole data is returned. Otherwise, keen.timestamp is set to + * emittedAt value + * * @param message AirbyteRecordMessage containing record data * @return Record data together with keen.timestamp field */ @@ -79,8 +109,7 @@ public JsonNode injectTimestamp(AirbyteRecordMessage message) { streamCursorFields.remove(streamName); injectTimestamp(data, Instant.ofEpochMilli(message.getEmittedAt()).toString()); } - } - else { + } else { injectTimestamp(data, Instant.ofEpochMilli(message.getEmittedAt()).toString()); } return data; @@ -95,7 +124,7 @@ private String parseTimestamp(CursorField cursorField, JsonNode data) { return switch (cursorField.type) { case NUMBER -> dateFromNumber( getNestedNode(data, cursorField.path) - .asLong()); + .asLong()); case STRING -> parser .parse(getNestedNode(data, cursorField.path) .asText()) @@ -123,12 +152,10 @@ public static class CursorField { private static final Set STRING_TYPES = Set.of( "STRING", "CHAR", "NCHAR", "NVARCHAR", "VARCHAR", "LONGVARCHAR", "DATE", - "TIME", "TIMESTAMP" - ); + "TIME", "TIMESTAMP"); private static final Set NUMBER_TYPES = Set.of( "NUMBER", "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT", "FLOAT", "DOUBLE", - "REAL", "NUMERIC", "DECIMAL" - ); + "REAL", "NUMERIC", "DECIMAL"); private final List path; private final CursorType type; @@ -171,8 +198,10 @@ private static CursorType getType(String typeString) { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; CursorField that = (CursorField) o; return Objects.equals(path, that.path) && type == that.type; } diff --git a/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java b/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java index 4eee3c5c0cad4..7d3db95b1fab9 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java +++ b/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.destination.keen; import static io.airbyte.integrations.destination.keen.KeenDestination.CONFIG_API_KEY; @@ -10,10 +34,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; - import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; - import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; @@ -59,11 +81,10 @@ protected List retrieveRecords(TestDestinationEnv testEnv, String stre ArrayNode array = keenHttpClient.extract(accentStrippedStreamName, projectId, apiKey); return Lists.newArrayList(array.elements()).stream() .sorted(Comparator.comparing(o -> o.get("keen").get("timestamp").textValue())) - .map(node -> (JsonNode)((ObjectNode)node).without("keen")) + .map(node -> (JsonNode) ((ObjectNode) node).without("keen")) .collect(Collectors.toList()); } - @Override protected void setup(TestDestinationEnv testEnv) throws Exception { if (!Files.exists(Path.of(SECRET_FILE_PATH))) { @@ -79,7 +100,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { @Override protected void tearDown(TestDestinationEnv testEnv) throws Exception { - // Changes for this particular operation - get all collections - can take a couple more time to propagate + // Changes for this particular operation - get all collections - can take a couple more time to + // propagate // than standard queries for the newly created collection Thread.sleep(5000); List keenCollections = keenHttpClient.getAllCollectionsForProject(projectId, apiKey); @@ -93,8 +115,10 @@ protected void tearDown(TestDestinationEnv testEnv) throws Exception { protected void runSyncAndVerifyStateOutput(JsonNode config, List messages, ConfiguredAirbyteCatalog catalog, - boolean runNormalization) throws Exception { + boolean runNormalization) + throws Exception { super.runSyncAndVerifyStateOutput(config, messages, catalog, runNormalization); Thread.sleep(10000); } + } diff --git a/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java b/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java index b0f0bbe239ec2..4484a6fff4e66 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java +++ b/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java @@ -1,10 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.keen; + +import static java.util.Map.entry; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.destination.keen.KeenTimestampService; import io.airbyte.integrations.destination.keen.KeenTimestampService.CursorField; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; @@ -12,15 +39,12 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import java.util.List; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; - -import static java.util.Map.entry; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; public class KeenTimestampServiceTest { @@ -172,12 +196,12 @@ private JsonNode buildExpectedJsonWithTimestamp(T value, String parsedTimest throws JsonProcessingException { return objectMapper.readTree( String.format( - "{" + - "\"cursorProperty\": %s," + - "\"otherProperty\": \"something\"," + - "\"keen\" : { \"timestamp\": \"%s\"}" + - "}", value, parsedTimestamp) - ); + "{" + + "\"cursorProperty\": %s," + + "\"otherProperty\": \"something\"," + + "\"keen\" : { \"timestamp\": \"%s\"}" + + "}", + value, parsedTimestamp)); } private ConfiguredAirbyteCatalog readConfiguredCatalogFromFile(String fileName) diff --git a/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json b/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json index cf7495ed89052..d976e34297f32 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json +++ b/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json @@ -138,9 +138,9 @@ "properties": { "property1": { "inside": { - "type": "number" + "type": "number" + } } - } } } } diff --git a/airbyte-integrations/connectors/source-chargebee/source_chargebee/streams.py b/airbyte-integrations/connectors/source-chargebee/source_chargebee/streams.py index 5f4d71c08b956..47988002500c5 100644 --- a/airbyte-integrations/connectors/source-chargebee/source_chargebee/streams.py +++ b/airbyte-integrations/connectors/source-chargebee/source_chargebee/streams.py @@ -126,6 +126,7 @@ class SemiIncrementalChargebeeStream(ChargebeeStream): This means that semi incremental streams read all records (like full_refresh streams) but do filtering directly in the code and output only latest records (like incremental streams). """ + cursor_field = "updated_at" def __init__(self, start_date: str): diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/user_location_report.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/user_location_report.json index 99549fa30e9e5..bcf02ad89fc81 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/user_location_report.json +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/user_location_report.json @@ -2,46 +2,45 @@ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { - "segments.date": { - "type": ["null", "string"], - "format": "datetime" + "type": ["null", "string"], + "format": "datetime" }, "segments.day_of_week": { - "type": ["null", "string"] + "type": ["null", "string"] }, "segments.month": { - "type": ["null", "string"] + "type": ["null", "string"] }, "segments.week": { - "type": ["null", "string"] + "type": ["null", "string"] }, "segments.quarter": { - "type": ["null", "string"] + "type": ["null", "string"] }, "segments.year": { - "type": ["null", "integer"] + "type": ["null", "integer"] }, "segments.ad_network_type": { - "type": ["null", "string"] + "type": ["null", "string"] }, "customer.currency_code": { - "type": ["null", "string"] + "type": ["null", "string"] }, "customer.id": { - "type": ["null", "integer"] + "type": ["null", "integer"] }, "customer.descriptive_name": { - "type": ["null", "string"] + "type": ["null", "string"] }, "customer.time_zone": { - "type": ["null", "string"] + "type": ["null", "string"] }, "user_location_view.country_criterion_id": { - "type": ["null", "string"] + "type": ["null", "string"] }, "user_location_view.resource_name": { - "type": ["null", "string"] + "type": ["null", "string"] }, "campaign.base_campaign": { "type": ["null", "string"] @@ -64,80 +63,80 @@ "ad_group.base_ad_group": { "type": ["null", "string"] }, - "metrics.all_conversions" : { - "type": ["null", "number"] + "metrics.all_conversions": { + "type": ["null", "number"] }, - "metrics.all_conversions_from_interactions_rate" : { - "type": ["null", "number"] + "metrics.all_conversions_from_interactions_rate": { + "type": ["null", "number"] }, - "metrics.all_conversions_value" : { - "type": ["null", "number"] + "metrics.all_conversions_value": { + "type": ["null", "number"] }, - "metrics.average_cost" : { - "type": ["null", "number"] + "metrics.average_cost": { + "type": ["null", "number"] }, - "metrics.average_cpc" : { - "type": ["null", "number"] + "metrics.average_cpc": { + "type": ["null", "number"] }, - "metrics.average_cpm" : { - "type": ["null", "number"] + "metrics.average_cpm": { + "type": ["null", "number"] }, - "metrics.average_cpv" : { - "type": ["null", "number"] + "metrics.average_cpv": { + "type": ["null", "number"] }, - "metrics.clicks" : { - "type": ["null", "number"] + "metrics.clicks": { + "type": ["null", "number"] }, - "metrics.conversions" : { - "type": ["null", "number"] + "metrics.conversions": { + "type": ["null", "number"] }, - "metrics.conversions_from_interactions_rate" : { - "type": ["null", "number"] + "metrics.conversions_from_interactions_rate": { + "type": ["null", "number"] }, - "metrics.conversions_value" : { - "type": ["null", "number"] + "metrics.conversions_value": { + "type": ["null", "number"] }, - "metrics.cost_micros" : { - "type": ["null", "number"] + "metrics.cost_micros": { + "type": ["null", "number"] }, - "metrics.cost_per_all_conversions" : { - "type": ["null", "number"] + "metrics.cost_per_all_conversions": { + "type": ["null", "number"] }, - "metrics.cost_per_conversion" : { - "type": ["null", "number"] + "metrics.cost_per_conversion": { + "type": ["null", "number"] }, - "metrics.cross_device_conversions" : { - "type": ["null", "number"] + "metrics.cross_device_conversions": { + "type": ["null", "number"] }, - "metrics.ctr" : { - "type": ["null", "number"] + "metrics.ctr": { + "type": ["null", "number"] }, - "metrics.impressions" : { - "type": ["null", "number"] + "metrics.impressions": { + "type": ["null", "number"] }, - "metrics.interaction_event_types" : { - "type": ["null", "array"] + "metrics.interaction_event_types": { + "type": ["null", "array"] }, - "metrics.interaction_rate" : { - "type": ["null", "number"] + "metrics.interaction_rate": { + "type": ["null", "number"] }, - "metrics.interactions" : { - "type": ["null", "number"] + "metrics.interactions": { + "type": ["null", "number"] }, - "metrics.value_per_all_conversions" : { - "type": ["null", "number"] + "metrics.value_per_all_conversions": { + "type": ["null", "number"] }, - "metrics.value_per_conversion" : { - "type": ["null", "number"] + "metrics.value_per_conversion": { + "type": ["null", "number"] }, - "metrics.video_view_rate" : { - "type": ["null", "number"] + "metrics.video_view_rate": { + "type": ["null", "number"] }, - "metrics.video_views" : { - "type": ["null", "number"] + "metrics.video_views": { + "type": ["null", "number"] }, - "metrics.view_through_conversions" : { - "type": ["null", "number"] + "metrics.view_through_conversions": { + "type": ["null", "number"] } } }