From 04af428bf24eed0db18f3d948a463cd04e1b4137 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Tue, 13 Aug 2024 15:27:43 -0600 Subject: [PATCH 01/13] Enable all file types for GCS --- .../integration_tests/configured_catalog.json | 9 +++++++++ .../connectors/source-gcs/source_gcs/config.py | 12 ++++++------ .../source-gcs/source_gcs/stream_reader.py | 5 +++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index b772480244ee2..976f67d776d72 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -1,5 +1,14 @@ { "streams": [ + { + "stream": { + "name": "example_1_jsonl", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, { "stream": { "name": "example_1", diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py index 181a85d92127b..c9ef6300ffcec 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py @@ -19,12 +19,12 @@ class SourceGCSStreamConfig(FileBasedStreamConfig): 'pattern matching look here.', order=1, ) - format: CsvFormat = Field( - title="Format", - description="The configuration options that are used to alter how to read incoming files that deviate from " - "the standard formatting.", - order=2, - ) + # format: CsvFormat = Field( + # title="Format", + # description="The configuration options that are used to alter how to read incoming files that deviate from " + # "the standard formatting.", + # order=2, + # ) legacy_prefix: Optional[str] = Field( title="Legacy Prefix", description="The path prefix configured in previous versions of the GCS connector. " diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py index c290dbe70aacf..48f12c433fd46 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py @@ -9,6 +9,7 @@ from datetime import datetime, timedelta from io import IOBase from typing import Iterable, List, Optional +import re import pytz import smart_open @@ -23,7 +24,7 @@ "We don't have access to {uri}. The file appears to have become unreachable during sync." "Check whether key {uri} exists in `{bucket}` bucket and/or has proper ACL permissions" ) -FILE_FORMAT = "csv" # TODO: Change if other file formats are implemented +FILE_FORMATS = "avro|csv|jsonl|tsv|parquet" class SourceGCSStreamReader(AbstractFileBasedStreamReader): @@ -77,7 +78,7 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo for blob in blobs: last_modified = blob.updated.astimezone(pytz.utc).replace(tzinfo=None) - if FILE_FORMAT in blob.name.lower() and (not start_date or last_modified >= start_date): + if re.search(FILE_FORMATS, blob.name.lower()) and (not start_date or last_modified >= start_date): uri = blob.generate_signed_url(expiration=timedelta(hours=1), version="v4") file_extension = ".".join(blob.name.split(".")[1:]) From 7db701c4da3cdc2360f3d48476a536df12d2515d Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Tue, 13 Aug 2024 15:41:12 -0600 Subject: [PATCH 02/13] Bump minor version, add changelog entry --- .../connectors/source-gcs/metadata.yaml | 2 +- .../connectors/source-gcs/pyproject.toml | 2 +- docs/integrations/sources/gcs.md | 49 ++++++++++--------- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index 383ad8ff05dbd..a17b501139045 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -7,7 +7,7 @@ data: connectorSubtype: file connectorType: source definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820 - dockerImageTag: 0.4.15 + dockerImageTag: 0.5.0 dockerRepository: airbyte/source-gcs documentationUrl: https://docs.airbyte.com/integrations/sources/gcs githubIssueLabel: source-gcs diff --git a/airbyte-integrations/connectors/source-gcs/pyproject.toml b/airbyte-integrations/connectors/source-gcs/pyproject.toml index 359a6ca7fe412..cd7a2750e19ca 100644 --- a/airbyte-integrations/connectors/source-gcs/pyproject.toml +++ b/airbyte-integrations/connectors/source-gcs/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "0.4.15" +version = "0.5.0" name = "source-gcs" description = "Source implementation for Gcs." authors = [ "Airbyte ",] diff --git a/docs/integrations/sources/gcs.md b/docs/integrations/sources/gcs.md index 18574a7476a54..3e1ddeee9a842 100644 --- a/docs/integrations/sources/gcs.md +++ b/docs/integrations/sources/gcs.md @@ -151,31 +151,32 @@ Leaving this field blank (default option) will disallow escaping. | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------| -| 0.4.15 | 2024-08-12 | [43733](https://github.com/airbytehq/airbyte/pull/43733) | Update dependencies | -| 0.4.14 | 2024-08-10 | [43512](https://github.com/airbytehq/airbyte/pull/43512) | Update dependencies | -| 0.4.13 | 2024-08-03 | [43236](https://github.com/airbytehq/airbyte/pull/43236) | Update dependencies | -| 0.4.12 | 2024-07-27 | [42693](https://github.com/airbytehq/airbyte/pull/42693) | Update dependencies | -| 0.4.11 | 2024-07-20 | [42312](https://github.com/airbytehq/airbyte/pull/42312) | Update dependencies | -| 0.4.10 | 2024-07-13 | [41865](https://github.com/airbytehq/airbyte/pull/41865) | Update dependencies | -| 0.4.9 | 2024-07-10 | [41430](https://github.com/airbytehq/airbyte/pull/41430) | Update dependencies | -| 0.4.8 | 2024-07-09 | [41148](https://github.com/airbytehq/airbyte/pull/41148) | Update dependencies | -| 0.4.7 | 2024-07-06 | [41015](https://github.com/airbytehq/airbyte/pull/41015) | Update dependencies | -| 0.4.6 | 2024-06-26 | [40540](https://github.com/airbytehq/airbyte/pull/40540) | Update dependencies | -| 0.4.5 | 2024-06-25 | [40391](https://github.com/airbytehq/airbyte/pull/40391) | Update dependencies | -| 0.4.4 | 2024-06-24 | [40234](https://github.com/airbytehq/airbyte/pull/40234) | Update dependencies | -| 0.4.3 | 2024-06-22 | [40089](https://github.com/airbytehq/airbyte/pull/40089) | Update dependencies | -| 0.4.2 | 2024-06-06 | [39255](https://github.com/airbytehq/airbyte/pull/39255) | [autopull] Upgrade base image to v1.2.2 | -| 0.4.1 | 2024-05-29 | [38696](https://github.com/airbytehq/airbyte/pull/38696) | Avoid error on empty stream when running discover | +| 0.5.0 | 2024-08-13 | [44015](https://github.com/airbytehq/airbyte/pull/44015) | Add support for all FileBased file types | +| 0.4.15 | 2024-08-12 | [43733](https://github.com/airbytehq/airbyte/pull/43733) | Update dependencies | +| 0.4.14 | 2024-08-10 | [43512](https://github.com/airbytehq/airbyte/pull/43512) | Update dependencies | +| 0.4.13 | 2024-08-03 | [43236](https://github.com/airbytehq/airbyte/pull/43236) | Update dependencies | +| 0.4.12 | 2024-07-27 | [42693](https://github.com/airbytehq/airbyte/pull/42693) | Update dependencies | +| 0.4.11 | 2024-07-20 | [42312](https://github.com/airbytehq/airbyte/pull/42312) | Update dependencies | +| 0.4.10 | 2024-07-13 | [41865](https://github.com/airbytehq/airbyte/pull/41865) | Update dependencies | +| 0.4.9 | 2024-07-10 | [41430](https://github.com/airbytehq/airbyte/pull/41430) | Update dependencies | +| 0.4.8 | 2024-07-09 | [41148](https://github.com/airbytehq/airbyte/pull/41148) | Update dependencies | +| 0.4.7 | 2024-07-06 | [41015](https://github.com/airbytehq/airbyte/pull/41015) | Update dependencies | +| 0.4.6 | 2024-06-26 | [40540](https://github.com/airbytehq/airbyte/pull/40540) | Update dependencies | +| 0.4.5 | 2024-06-25 | [40391](https://github.com/airbytehq/airbyte/pull/40391) | Update dependencies | +| 0.4.4 | 2024-06-24 | [40234](https://github.com/airbytehq/airbyte/pull/40234) | Update dependencies | +| 0.4.3 | 2024-06-22 | [40089](https://github.com/airbytehq/airbyte/pull/40089) | Update dependencies | +| 0.4.2 | 2024-06-06 | [39255](https://github.com/airbytehq/airbyte/pull/39255) | [autopull] Upgrade base image to v1.2.2 | +| 0.4.1 | 2024-05-29 | [38696](https://github.com/airbytehq/airbyte/pull/38696) | Avoid error on empty stream when running discover | | 0.4.0 | 2024-03-21 | [36373](https://github.com/airbytehq/airbyte/pull/36373) | Add Gzip and Bzip compression support. Manage dependencies with Poetry. | -| 0.3.7 | 2024-02-06 | [34936](https://github.com/airbytehq/airbyte/pull/34936) | Bump CDK version to avoid missing SyncMode errors | -| 0.3.6 | 2024-01-30 | [34681](https://github.com/airbytehq/airbyte/pull/34681) | Unpin CDK version to make compatible with the Concurrent CDK | +| 0.3.7 | 2024-02-06 | [34936](https://github.com/airbytehq/airbyte/pull/34936) | Bump CDK version to avoid missing SyncMode errors | +| 0.3.6 | 2024-01-30 | [34681](https://github.com/airbytehq/airbyte/pull/34681) | Unpin CDK version to make compatible with the Concurrent CDK | | 0.3.5 | 2024-01-30 | [34661](https://github.com/airbytehq/airbyte/pull/34661) | Pin CDK version until upgrade for compatibility with the Concurrent CDK | -| 0.3.4 | 2024-01-11 | [34158](https://github.com/airbytehq/airbyte/pull/34158) | Fix issue in stream reader for document file type parser | -| 0.3.3 | 2023-12-06 | [33187](https://github.com/airbytehq/airbyte/pull/33187) | Bump CDK version to hide source-defined primary key | -| 0.3.2 | 2023-11-16 | [32608](https://github.com/airbytehq/airbyte/pull/32608) | Improve document file type parser | -| 0.3.1 | 2023-11-13 | [32357](https://github.com/airbytehq/airbyte/pull/32357) | Improve spec schema | -| 0.3.0 | 2023-10-11 | [31212](https://github.com/airbytehq/airbyte/pull/31212) | Migrated to file based CDK | -| 0.2.0 | 2023-06-26 | [27725](https://github.com/airbytehq/airbyte/pull/27725) | License Update: Elv2 | -| 0.1.0 | 2023-02-16 | [23186](https://github.com/airbytehq/airbyte/pull/23186) | New Source: GCS | +| 0.3.4 | 2024-01-11 | [34158](https://github.com/airbytehq/airbyte/pull/34158) | Fix issue in stream reader for document file type parser | +| 0.3.3 | 2023-12-06 | [33187](https://github.com/airbytehq/airbyte/pull/33187) | Bump CDK version to hide source-defined primary key | +| 0.3.2 | 2023-11-16 | [32608](https://github.com/airbytehq/airbyte/pull/32608) | Improve document file type parser | +| 0.3.1 | 2023-11-13 | [32357](https://github.com/airbytehq/airbyte/pull/32357) | Improve spec schema | +| 0.3.0 | 2023-10-11 | [31212](https://github.com/airbytehq/airbyte/pull/31212) | Migrated to file based CDK | +| 0.2.0 | 2023-06-26 | [27725](https://github.com/airbytehq/airbyte/pull/27725) | License Update: Elv2 | +| 0.1.0 | 2023-02-16 | [23186](https://github.com/airbytehq/airbyte/pull/23186) | New Source: GCS | From e9506d4c3817a039103496beadcfdea7cc73a39d Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Wed, 14 Aug 2024 14:32:19 -0600 Subject: [PATCH 03/13] Remove SourceGCSStreamConfig from streams field --- .../integration_tests/abnormal_state.json | 14 + .../source-gcs/integration_tests/spec.json | 319 +++++++++++++++++- .../source-gcs/source_gcs/config.py | 47 ++- 3 files changed, 349 insertions(+), 31 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json index dd226c97b89e3..59a7542e6e18a 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json @@ -13,6 +13,20 @@ } } }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "_ab_source_file_last_modified": "2023-02-27T10:34:32.664000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/jsonl_test/example_1.jsonl", + "history": { + "https://storage.googleapis.com/airbyte-integration-test-source-gcs/jsonl_test/example_1.jsonl": "2023-02-27T10:34:32.664000Z" + } + }, + "stream_descriptor": { + "name": "example_1" + } + } + }, { "type": "STREAM", "stream": { diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json index f0b1dad433a4b..9e18256d1ca0a 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json @@ -21,7 +21,7 @@ "order": 3, "type": "array", "items": { - "title": "SourceGCSStreamConfig", + "title": "FileBasedStreamConfig", "type": "object", "properties": { "name": { @@ -33,9 +33,12 @@ "globs": { "title": "Globs", "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.", + "default": ["**"], "order": 1, "type": "array", - "items": { "type": "string" } + "items": { + "type": "string" + } }, "legacy_prefix": { "title": "Legacy Prefix", @@ -72,6 +75,25 @@ "order": 2, "type": "object", "oneOf": [ + { + "title": "Avro Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "avro", + "const": "avro", + "type": "string" + }, + "double_as_string": { + "title": "Convert Double Fields to Strings", + "description": "Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.", + "default": false, + "type": "boolean" + } + }, + "required": ["filetype"] + }, { "title": "CSV Format", "type": "object", @@ -116,7 +138,9 @@ "description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", "default": [], "type": "array", - "items": { "type": "string" }, + "items": { + "type": "string" + }, "uniqueItems": true }, "strings_can_be_null": { @@ -140,7 +164,9 @@ "header_definition": { "title": "CSV Header Definition", "description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", - "default": { "header_definition_type": "From CSV" }, + "default": { + "header_definition_type": "From CSV" + }, "oneOf": [ { "title": "From CSV", @@ -182,7 +208,9 @@ "title": "Column Names", "description": "The column names that will be used while emitting the CSV records", "type": "array", - "items": { "type": "string" } + "items": { + "type": "string" + } } }, "required": ["column_names", "header_definition_type"] @@ -195,7 +223,9 @@ "description": "A set of case-sensitive strings that should be interpreted as true values.", "default": ["y", "yes", "t", "true", "on", "1"], "type": "array", - "items": { "type": "string" }, + "items": { + "type": "string" + }, "uniqueItems": true }, "false_values": { @@ -203,7 +233,9 @@ "description": "A set of case-sensitive strings that should be interpreted as false values.", "default": ["n", "no", "f", "false", "off", "0"], "type": "array", - "items": { "type": "string" }, + "items": { + "type": "string" + }, "uniqueItems": true }, "inference_type": { @@ -221,6 +253,93 @@ } }, "required": ["filetype"] + }, + { + "title": "Jsonl Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "jsonl", + "const": "jsonl", + "type": "string" + } + }, + "required": ["filetype"] + }, + { + "title": "Parquet Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "parquet", + "const": "parquet", + "type": "string" + }, + "decimal_as_float": { + "title": "Convert Decimal Fields to Floats", + "description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.", + "default": false, + "type": "boolean" + } + }, + "required": ["filetype"] + }, + { + "title": "Unstructured Document Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "unstructured", + "const": "unstructured", + "type": "string" + }, + "skip_unprocessable_files": { + "title": "Skip Unprocessable Files", + "description": "If true, skip files that cannot be parsed and pass the error message along as the _ab_source_file_parse_error field. If false, fail the sync.", + "default": true, + "always_show": true, + "type": "boolean" + }, + "strategy": { + "title": "Parsing Strategy", + "description": "The strategy used to parse documents. `fast` extracts text directly from the document which doesn't work for all files. `ocr_only` is more reliable, but slower. `hi_res` is the most reliable, but requires an API key and a hosted instance of unstructured and can't be used with local mode. See the unstructured.io documentation for more details: https://unstructured-io.github.io/unstructured/core/partition.html#partition-pdf", + "default": "auto", + "always_show": true, + "order": 0, + "enum": ["auto", "fast", "ocr_only", "hi_res"], + "type": "string" + }, + "processing": { + "title": "Processing", + "description": "Processing configuration", + "default": { + "mode": "local" + }, + "type": "object", + "oneOf": [ + { + "title": "Local", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "local", + "const": "local", + "enum": ["local"], + "type": "string" + } + }, + "description": "Process files locally, supporting `fast` and `ocr` modes. This is the default option.", + "required": ["mode"] + } + ] + } + }, + "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", + "required": ["filetype"] } ] }, @@ -229,11 +348,197 @@ "description": "When enabled, syncs will not validate or structure records against the stream's schema.", "default": false, "type": "boolean" + }, + "recent_n_files_to_read_for_schema_discovery": { + "title": "Files To Read For Schema Discover", + "description": "The number of recent files which will be used to discover the schema for this stream.", + "exclusiveMinimum": 0, + "type": "integer" } }, "required": ["name", "format"] } }, + "format": { + "title": "File Format", + "description": "Deprecated and will be removed soon. Please do not use this field anymore and use streams.format instead. The format of the files you'd like to replicate", + "default": "csv", + "order": 120, + "type": "object", + "oneOf": [ + { + "title": "CSV", + "description": "This connector utilises PyArrow (Apache Arrow) for CSV parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "csv", + "const": "csv", + "type": "string" + }, + "delimiter": { + "title": "Delimiter", + "description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", + "default": ",", + "minLength": 1, + "order": 0, + "type": "string" + }, + "infer_datatypes": { + "title": "Infer Datatypes", + "description": "Configures whether a schema for the source should be inferred from the current data or not. If set to false and a custom schema is set, then the manually enforced schema is used. If a schema is not manually set, and this is set to false, then all fields will be read as strings", + "default": true, + "order": 1, + "type": "boolean" + }, + "quote_char": { + "title": "Quote Character", + "description": "The character used for quoting CSV values. To disallow quoting, make this field blank.", + "default": "\"", + "order": 2, + "type": "string" + }, + "escape_char": { + "title": "Escape Character", + "description": "The character used for escaping special characters. To disallow escaping, leave this field blank.", + "order": 3, + "type": "string" + }, + "encoding": { + "title": "Encoding", + "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.", + "default": "utf8", + "order": 4, + "type": "string" + }, + "double_quote": { + "title": "Double Quote", + "description": "Whether two quotes in a quoted CSV value denote a single quote in the data.", + "default": true, + "order": 5, + "type": "boolean" + }, + "newlines_in_values": { + "title": "Allow newlines in values", + "description": "Whether newline characters are allowed in CSV values. Turning this on may affect performance. Leave blank to default to False.", + "default": false, + "order": 6, + "type": "boolean" + }, + "additional_reader_options": { + "title": "Additional Reader Options", + "description": "Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options detailed here. 'column_types' is used internally to handle schema so overriding that would likely cause problems.", + "examples": [ + "{\"timestamp_parsers\": [\"%m/%d/%Y %H:%M\", \"%Y/%m/%d %H:%M\"], \"strings_can_be_null\": true, \"null_values\": [\"NA\", \"NULL\"]}" + ], + "order": 7, + "type": "string" + }, + "advanced_options": { + "title": "Advanced Options", + "description": "Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", + "examples": ["{\"column_names\": [\"column1\", \"column2\"]}"], + "order": 8, + "type": "string" + }, + "block_size": { + "title": "Block Size", + "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + "default": 10000, + "minimum": 1, + "maximum": 2147483647, + "order": 9, + "type": "integer" + } + } + }, + { + "title": "Parquet", + "description": "This connector utilises PyArrow (Apache Arrow) for Parquet parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "parquet", + "const": "parquet", + "type": "string" + }, + "columns": { + "title": "Selected Columns", + "description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here as a comma-delimited list. Leave it empty to sync all columns.", + "order": 0, + "type": "array", + "items": { + "type": "string" + } + }, + "batch_size": { + "title": "Record batch size", + "description": "Maximum number of records per batch read from the input files. Batches may be smaller if there aren’t enough rows in the file. This option can help avoid out-of-memory errors if your data is particularly wide.", + "default": 65536, + "order": 1, + "type": "integer" + }, + "buffer_size": { + "title": "Buffer Size", + "description": "Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help avoid out-of-memory errors if your data is particularly wide.", + "default": 2, + "type": "integer" + } + } + }, + { + "title": "Avro", + "description": "This connector utilises fastavro for Avro parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "avro", + "const": "avro", + "type": "string" + } + } + }, + { + "title": "Jsonl", + "description": "This connector uses PyArrow for JSON Lines (jsonl) file parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "jsonl", + "const": "jsonl", + "type": "string" + }, + "newlines_in_values": { + "title": "Allow newlines in values", + "description": "Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + "default": false, + "order": 0, + "type": "boolean" + }, + "unexpected_field_behavior": { + "title": "Unexpected field behavior", + "description": "How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details", + "default": "infer", + "examples": ["ignore", "infer", "error"], + "order": 1, + "enum": ["ignore", "infer", "error"] + }, + "block_size": { + "title": "Block Size", + "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + "default": 0, + "order": 2, + "type": "integer" + } + } + } + ], + "airbyte_hidden": true + }, "service_account": { "title": "Service Account Information", "description": "Enter your Google Cloud service account key in JSON format", diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py index c9ef6300ffcec..6c30d54e68e80 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py @@ -6,7 +6,6 @@ from typing import List, Optional from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec -from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from pydantic import AnyUrl, Field @@ -53,16 +52,16 @@ class Config(AbstractFileBasedSpec): bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=2) - streams: List[SourceGCSStreamConfig] = Field( - title="The list of streams to sync", - description=( - "Each instance of this configuration defines a stream. " - "Use this to define which files belong in the stream, their format, and how they should be " - "parsed and validated. When sending data to warehouse destination such as Snowflake or " - "BigQuery, each stream is a separate table." - ), - order=3, - ) + # streams: List[SourceGCSStreamConfig] = Field( + # title="The list of streams to sync", + # description=( + # "Each instance of this configuration defines a stream. " + # "Use this to define which files belong in the stream, their format, and how they should be " + # "parsed and validated. When sending data to warehouse destination such as Snowflake or " + # "BigQuery, each stream is a separate table." + # ), + # order=3, + # ) @classmethod def documentation_url(cls) -> AnyUrl: @@ -71,17 +70,17 @@ def documentation_url(cls) -> AnyUrl: """ return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https") - @staticmethod - def replace_enum_allOf_and_anyOf(schema): - """ - Replace allOf with anyOf when appropriate in the schema with one value. - """ - objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] - if len(objects_to_check.get("allOf", [])) == 1: - objects_to_check["anyOf"] = objects_to_check.pop("allOf") - - return super(Config, Config).replace_enum_allOf_and_anyOf(schema) + # @staticmethod + # def replace_enum_allOf_and_anyOf(schema): + # """ + # Replace allOf with anyOf when appropriate in the schema with one value. + # """ + # objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] + # if len(objects_to_check.get("allOf", [])) == 1: + # objects_to_check["anyOf"] = objects_to_check.pop("allOf") + # + # return super(Config, Config).replace_enum_allOf_and_anyOf(schema) - @staticmethod - def remove_discriminator(schema) -> None: - pass + # @staticmethod + # def remove_discriminator(schema) -> None: + # pass From 6a50b768f3cc013bdfbd5c4c14c48122b4834729 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Wed, 14 Aug 2024 15:13:44 -0600 Subject: [PATCH 04/13] Update spec.json, apply formatting --- .../source-gcs/integration_tests/spec.json | 934 ++++++++---------- .../source-gcs/source_gcs/stream_reader.py | 2 +- 2 files changed, 407 insertions(+), 529 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json index 9e18256d1ca0a..38797e7f26ce3 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json @@ -1,558 +1,436 @@ { - "documentationUrl": "https://docs.airbyte.com/integrations/sources/gcs", - "connectionSpecification": { - "title": "Config", - "description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be\nmodified to uptake the changes because it is responsible for converting\nlegacy GCS configs into file based configs using the File-Based CDK.", - "type": "object", - "properties": { - "start_date": { - "title": "Start Date", - "description": "UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", - "examples": ["2021-01-01T00:00:00.000000Z"], - "format": "date-time", - "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", - "pattern_descriptor": "YYYY-MM-DDTHH:mm:ss.SSSSSSZ", - "order": 1, - "type": "string" - }, - "streams": { - "title": "The list of streams to sync", - "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.", - "order": 3, - "type": "array", - "items": { - "title": "FileBasedStreamConfig", - "type": "object", - "properties": { - "name": { - "title": "Name", - "description": "The name of the stream.", - "order": 0, - "type": "string" - }, - "globs": { - "title": "Globs", - "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.", - "default": ["**"], - "order": 1, - "type": "array", - "items": { + "type": "SPEC", + "spec": { + "documentationUrl": "https://docs.airbyte.com/integrations/sources/gcs", + "connectionSpecification": { + "title": "Config", + "description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be\nmodified to uptake the changes because it is responsible for converting\nlegacy GCS configs into file based configs using the File-Based CDK.", + "type": "object", + "properties": { + "start_date": { + "title": "Start Date", + "description": "UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", + "examples": ["2021-01-01T00:00:00.000000Z"], + "format": "date-time", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", + "pattern_descriptor": "YYYY-MM-DDTHH:mm:ss.SSSSSSZ", + "order": 1, + "type": "string" + }, + "streams": { + "title": "The list of streams to sync", + "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.", + "order": 10, + "type": "array", + "items": { + "title": "FileBasedStreamConfig", + "type": "object", + "properties": { + "name": { + "title": "Name", + "description": "The name of the stream.", "type": "string" - } - }, - "legacy_prefix": { - "title": "Legacy Prefix", - "description": "The path prefix configured in previous versions of the GCS connector. This option is deprecated in favor of a single glob.", - "airbyte_hidden": true, - "type": "string" - }, - "validation_policy": { - "title": "Validation Policy", - "description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", - "default": "Emit Record", - "enum": ["Emit Record", "Skip Record", "Wait for Discover"] - }, - "input_schema": { - "title": "Input Schema", - "description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", - "type": "string" - }, - "primary_key": { - "title": "Primary Key", - "description": "The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", - "airbyte_hidden": true, - "type": "string" - }, - "days_to_sync_if_history_is_full": { - "title": "Days To Sync If History Is Full", - "description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", - "default": 3, - "type": "integer" - }, - "format": { - "title": "Format", - "description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", - "order": 2, - "type": "object", - "oneOf": [ - { - "title": "Avro Format", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "avro", - "const": "avro", - "type": "string" + }, + "globs": { + "title": "Globs", + "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.", + "default": ["**"], + "order": 1, + "type": "array", + "items": { + "type": "string" + } + }, + "legacy_prefix": { + "title": "Legacy Prefix", + "description": "The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", + "airbyte_hidden": true, + "type": "string" + }, + "validation_policy": { + "title": "Validation Policy", + "description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", + "default": "Emit Record", + "enum": ["Emit Record", "Skip Record", "Wait for Discover"] + }, + "input_schema": { + "title": "Input Schema", + "description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", + "type": "string" + }, + "primary_key": { + "title": "Primary Key", + "description": "The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", + "airbyte_hidden": true, + "type": "string" + }, + "days_to_sync_if_history_is_full": { + "title": "Days To Sync If History Is Full", + "description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", + "default": 3, + "type": "integer" + }, + "format": { + "title": "Format", + "description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", + "type": "object", + "oneOf": [ + { + "title": "Avro Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "avro", + "const": "avro", + "type": "string" + }, + "double_as_string": { + "title": "Convert Double Fields to Strings", + "description": "Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.", + "default": false, + "type": "boolean" + } }, - "double_as_string": { - "title": "Convert Double Fields to Strings", - "description": "Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.", - "default": false, - "type": "boolean" - } + "required": ["filetype"] }, - "required": ["filetype"] - }, - { - "title": "CSV Format", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "csv", - "const": "csv", - "type": "string" - }, - "delimiter": { - "title": "Delimiter", - "description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", - "default": ",", - "type": "string" - }, - "quote_char": { - "title": "Quote Character", - "description": "The character used for quoting CSV values. To disallow quoting, make this field blank.", - "default": "\"", - "type": "string" - }, - "escape_char": { - "title": "Escape Character", - "description": "The character used for escaping special characters. To disallow escaping, leave this field blank.", - "type": "string" - }, - "encoding": { - "title": "Encoding", - "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.", - "default": "utf8", - "type": "string" - }, - "double_quote": { - "title": "Double Quote", - "description": "Whether two quotes in a quoted CSV value denote a single quote in the data.", - "default": true, - "type": "boolean" - }, - "null_values": { - "title": "Null Values", - "description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", - "default": [], - "type": "array", - "items": { + { + "title": "CSV Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "csv", + "const": "csv", "type": "string" }, - "uniqueItems": true - }, - "strings_can_be_null": { - "title": "Strings Can Be Null", - "description": "Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.", - "default": true, - "type": "boolean" - }, - "skip_rows_before_header": { - "title": "Skip Rows Before Header", - "description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", - "default": 0, - "type": "integer" - }, - "skip_rows_after_header": { - "title": "Skip Rows After Header", - "description": "The number of rows to skip after the header row.", - "default": 0, - "type": "integer" - }, - "header_definition": { - "title": "CSV Header Definition", - "description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", - "default": { - "header_definition_type": "From CSV" + "delimiter": { + "title": "Delimiter", + "description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", + "default": ",", + "type": "string" }, - "oneOf": [ - { - "title": "From CSV", - "type": "object", - "properties": { - "header_definition_type": { - "title": "Header Definition Type", - "default": "From CSV", - "const": "From CSV", - "type": "string" - } - }, - "required": ["header_definition_type"] + "quote_char": { + "title": "Quote Character", + "description": "The character used for quoting CSV values. To disallow quoting, make this field blank.", + "default": "\"", + "type": "string" + }, + "escape_char": { + "title": "Escape Character", + "description": "The character used for escaping special characters. To disallow escaping, leave this field blank.", + "type": "string" + }, + "encoding": { + "title": "Encoding", + "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.", + "default": "utf8", + "type": "string" + }, + "double_quote": { + "title": "Double Quote", + "description": "Whether two quotes in a quoted CSV value denote a single quote in the data.", + "default": true, + "type": "boolean" + }, + "null_values": { + "title": "Null Values", + "description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", + "default": [], + "type": "array", + "items": { + "type": "string" }, - { - "title": "Autogenerated", - "type": "object", - "properties": { - "header_definition_type": { - "title": "Header Definition Type", - "default": "Autogenerated", - "const": "Autogenerated", - "type": "string" - } - }, - "required": ["header_definition_type"] + "uniqueItems": true + }, + "strings_can_be_null": { + "title": "Strings Can Be Null", + "description": "Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.", + "default": true, + "type": "boolean" + }, + "skip_rows_before_header": { + "title": "Skip Rows Before Header", + "description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", + "default": 0, + "type": "integer" + }, + "skip_rows_after_header": { + "title": "Skip Rows After Header", + "description": "The number of rows to skip after the header row.", + "default": 0, + "type": "integer" + }, + "header_definition": { + "title": "CSV Header Definition", + "description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", + "default": { + "header_definition_type": "From CSV" }, - { - "title": "User Provided", - "type": "object", - "properties": { - "header_definition_type": { - "title": "Header Definition Type", - "default": "User Provided", - "const": "User Provided", - "type": "string" + "oneOf": [ + { + "title": "From CSV", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "From CSV", + "const": "From CSV", + "type": "string" + } }, - "column_names": { - "title": "Column Names", - "description": "The column names that will be used while emitting the CSV records", - "type": "array", - "items": { + "required": ["header_definition_type"] + }, + { + "title": "Autogenerated", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "Autogenerated", + "const": "Autogenerated", "type": "string" } - } + }, + "required": ["header_definition_type"] }, - "required": ["column_names", "header_definition_type"] - } - ], - "type": "object" - }, - "true_values": { - "title": "True Values", - "description": "A set of case-sensitive strings that should be interpreted as true values.", - "default": ["y", "yes", "t", "true", "on", "1"], - "type": "array", - "items": { - "type": "string" + { + "title": "User Provided", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "User Provided", + "const": "User Provided", + "type": "string" + }, + "column_names": { + "title": "Column Names", + "description": "The column names that will be used while emitting the CSV records", + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "column_names", + "header_definition_type" + ] + } + ], + "type": "object" }, - "uniqueItems": true - }, - "false_values": { - "title": "False Values", - "description": "A set of case-sensitive strings that should be interpreted as false values.", - "default": ["n", "no", "f", "false", "off", "0"], - "type": "array", - "items": { - "type": "string" + "true_values": { + "title": "True Values", + "description": "A set of case-sensitive strings that should be interpreted as true values.", + "default": ["y", "yes", "t", "true", "on", "1"], + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true }, - "uniqueItems": true - }, - "inference_type": { - "title": "Inference Type", - "description": "How to infer the types of the columns. If none, inference default to strings.", - "default": "None", - "airbyte_hidden": true, - "enum": ["None", "Primitive Types Only"] + "false_values": { + "title": "False Values", + "description": "A set of case-sensitive strings that should be interpreted as false values.", + "default": ["n", "no", "f", "false", "off", "0"], + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true + }, + "inference_type": { + "title": "Inference Type", + "description": "How to infer the types of the columns. If none, inference default to strings.", + "default": "None", + "airbyte_hidden": true, + "enum": ["None", "Primitive Types Only"] + }, + "ignore_errors_on_fields_mismatch": { + "title": "Ignore errors on field mismatch", + "description": "Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.", + "default": false, + "type": "boolean" + } }, - "ignore_errors_on_fields_mismatch": { - "title": "Ignore errors on field mismatch", - "description": "Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.", - "default": false, - "type": "boolean" - } + "required": ["filetype"] }, - "required": ["filetype"] - }, - { - "title": "Jsonl Format", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "jsonl", - "const": "jsonl", - "type": "string" - } - }, - "required": ["filetype"] - }, - { - "title": "Parquet Format", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "parquet", - "const": "parquet", - "type": "string" + { + "title": "Jsonl Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "jsonl", + "const": "jsonl", + "type": "string" + } }, - "decimal_as_float": { - "title": "Convert Decimal Fields to Floats", - "description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.", - "default": false, - "type": "boolean" - } + "required": ["filetype"] }, - "required": ["filetype"] - }, - { - "title": "Unstructured Document Format", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "unstructured", - "const": "unstructured", - "type": "string" - }, - "skip_unprocessable_files": { - "title": "Skip Unprocessable Files", - "description": "If true, skip files that cannot be parsed and pass the error message along as the _ab_source_file_parse_error field. If false, fail the sync.", - "default": true, - "always_show": true, - "type": "boolean" - }, - "strategy": { - "title": "Parsing Strategy", - "description": "The strategy used to parse documents. `fast` extracts text directly from the document which doesn't work for all files. `ocr_only` is more reliable, but slower. `hi_res` is the most reliable, but requires an API key and a hosted instance of unstructured and can't be used with local mode. See the unstructured.io documentation for more details: https://unstructured-io.github.io/unstructured/core/partition.html#partition-pdf", - "default": "auto", - "always_show": true, - "order": 0, - "enum": ["auto", "fast", "ocr_only", "hi_res"], - "type": "string" + { + "title": "Parquet Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "parquet", + "const": "parquet", + "type": "string" + }, + "decimal_as_float": { + "title": "Convert Decimal Fields to Floats", + "description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.", + "default": false, + "type": "boolean" + } }, - "processing": { - "title": "Processing", - "description": "Processing configuration", - "default": { - "mode": "local" + "required": ["filetype"] + }, + { + "title": "Unstructured Document Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "unstructured", + "const": "unstructured", + "type": "string" + }, + "skip_unprocessable_files": { + "title": "Skip Unprocessable Files", + "description": "If true, skip files that cannot be parsed and pass the error message along as the _ab_source_file_parse_error field. If false, fail the sync.", + "default": true, + "always_show": true, + "type": "boolean" + }, + "strategy": { + "title": "Parsing Strategy", + "description": "The strategy used to parse documents. `fast` extracts text directly from the document which doesn't work for all files. `ocr_only` is more reliable, but slower. `hi_res` is the most reliable, but requires an API key and a hosted instance of unstructured and can't be used with local mode. See the unstructured.io documentation for more details: https://unstructured-io.github.io/unstructured/core/partition.html#partition-pdf", + "default": "auto", + "always_show": true, + "order": 0, + "enum": ["auto", "fast", "ocr_only", "hi_res"], + "type": "string" }, - "type": "object", - "oneOf": [ - { - "title": "Local", - "type": "object", - "properties": { - "mode": { - "title": "Mode", - "default": "local", - "const": "local", - "enum": ["local"], - "type": "string" - } + "processing": { + "title": "Processing", + "description": "Processing configuration", + "default": { + "mode": "local" + }, + "type": "object", + "oneOf": [ + { + "title": "Local", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "local", + "const": "local", + "enum": ["local"], + "type": "string" + } + }, + "description": "Process files locally, supporting `fast` and `ocr` modes. This is the default option.", + "required": ["mode"] }, - "description": "Process files locally, supporting `fast` and `ocr` modes. This is the default option.", - "required": ["mode"] - } - ] - } - }, - "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", - "required": ["filetype"] - } - ] - }, - "schemaless": { - "title": "Schemaless", - "description": "When enabled, syncs will not validate or structure records against the stream's schema.", - "default": false, - "type": "boolean" - }, - "recent_n_files_to_read_for_schema_discovery": { - "title": "Files To Read For Schema Discover", - "description": "The number of recent files which will be used to discover the schema for this stream.", - "exclusiveMinimum": 0, - "type": "integer" - } - }, - "required": ["name", "format"] - } - }, - "format": { - "title": "File Format", - "description": "Deprecated and will be removed soon. Please do not use this field anymore and use streams.format instead. The format of the files you'd like to replicate", - "default": "csv", - "order": 120, - "type": "object", - "oneOf": [ - { - "title": "CSV", - "description": "This connector utilises PyArrow (Apache Arrow) for CSV parsing.", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "csv", - "const": "csv", - "type": "string" - }, - "delimiter": { - "title": "Delimiter", - "description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", - "default": ",", - "minLength": 1, - "order": 0, - "type": "string" - }, - "infer_datatypes": { - "title": "Infer Datatypes", - "description": "Configures whether a schema for the source should be inferred from the current data or not. If set to false and a custom schema is set, then the manually enforced schema is used. If a schema is not manually set, and this is set to false, then all fields will be read as strings", - "default": true, - "order": 1, - "type": "boolean" - }, - "quote_char": { - "title": "Quote Character", - "description": "The character used for quoting CSV values. To disallow quoting, make this field blank.", - "default": "\"", - "order": 2, - "type": "string" - }, - "escape_char": { - "title": "Escape Character", - "description": "The character used for escaping special characters. To disallow escaping, leave this field blank.", - "order": 3, - "type": "string" - }, - "encoding": { - "title": "Encoding", - "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.", - "default": "utf8", - "order": 4, - "type": "string" - }, - "double_quote": { - "title": "Double Quote", - "description": "Whether two quotes in a quoted CSV value denote a single quote in the data.", - "default": true, - "order": 5, - "type": "boolean" - }, - "newlines_in_values": { - "title": "Allow newlines in values", - "description": "Whether newline characters are allowed in CSV values. Turning this on may affect performance. Leave blank to default to False.", - "default": false, - "order": 6, - "type": "boolean" - }, - "additional_reader_options": { - "title": "Additional Reader Options", - "description": "Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options detailed here. 'column_types' is used internally to handle schema so overriding that would likely cause problems.", - "examples": [ - "{\"timestamp_parsers\": [\"%m/%d/%Y %H:%M\", \"%Y/%m/%d %H:%M\"], \"strings_can_be_null\": true, \"null_values\": [\"NA\", \"NULL\"]}" - ], - "order": 7, - "type": "string" - }, - "advanced_options": { - "title": "Advanced Options", - "description": "Optionally add a valid JSON string here to provide additional Pyarrow ReadOptions. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.", - "examples": ["{\"column_names\": [\"column1\", \"column2\"]}"], - "order": 8, - "type": "string" - }, - "block_size": { - "title": "Block Size", - "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", - "default": 10000, - "minimum": 1, - "maximum": 2147483647, - "order": 9, - "type": "integer" - } - } - }, - { - "title": "Parquet", - "description": "This connector utilises PyArrow (Apache Arrow) for Parquet parsing.", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "parquet", - "const": "parquet", - "type": "string" - }, - "columns": { - "title": "Selected Columns", - "description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here as a comma-delimited list. Leave it empty to sync all columns.", - "order": 0, - "type": "array", - "items": { - "type": "string" - } - }, - "batch_size": { - "title": "Record batch size", - "description": "Maximum number of records per batch read from the input files. Batches may be smaller if there aren’t enough rows in the file. This option can help avoid out-of-memory errors if your data is particularly wide.", - "default": 65536, - "order": 1, - "type": "integer" - }, - "buffer_size": { - "title": "Buffer Size", - "description": "Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help avoid out-of-memory errors if your data is particularly wide.", - "default": 2, - "type": "integer" - } - } - }, - { - "title": "Avro", - "description": "This connector utilises fastavro for Avro parsing.", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "avro", - "const": "avro", - "type": "string" - } - } - }, - { - "title": "Jsonl", - "description": "This connector uses PyArrow for JSON Lines (jsonl) file parsing.", - "type": "object", - "properties": { - "filetype": { - "title": "Filetype", - "default": "jsonl", - "const": "jsonl", - "type": "string" + { + "title": "via API", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "api", + "const": "api", + "enum": ["api"], + "type": "string" + }, + "api_key": { + "title": "API Key", + "description": "The API key to use matching the environment", + "default": "", + "always_show": true, + "airbyte_secret": true, + "type": "string" + }, + "api_url": { + "title": "API URL", + "description": "The URL of the unstructured API to use", + "default": "https://api.unstructured.io", + "always_show": true, + "examples": ["https://api.unstructured.com"], + "type": "string" + }, + "parameters": { + "title": "Additional URL Parameters", + "description": "List of parameters send to the API", + "default": [], + "always_show": true, + "type": "array", + "items": { + "title": "APIParameterConfigModel", + "type": "object", + "properties": { + "name": { + "title": "Parameter name", + "description": "The name of the unstructured API parameter to use", + "examples": [ + "combine_under_n_chars", + "languages" + ], + "type": "string" + }, + "value": { + "title": "Value", + "description": "The value of the parameter", + "examples": ["true", "hi_res"], + "type": "string" + } + }, + "required": ["name", "value"] + } + } + }, + "description": "Process files via an API, using the `hi_res` mode. This option is useful for increased performance and accuracy, but requires an API key and a hosted instance of unstructured.", + "required": ["mode"] + } + ] + } + }, + "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", + "required": ["filetype"] + } + ] }, - "newlines_in_values": { - "title": "Allow newlines in values", - "description": "Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + "schemaless": { + "title": "Schemaless", + "description": "When enabled, syncs will not validate or structure records against the stream's schema.", "default": false, - "order": 0, "type": "boolean" - }, - "unexpected_field_behavior": { - "title": "Unexpected field behavior", - "description": "How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details", - "default": "infer", - "examples": ["ignore", "infer", "error"], - "order": 1, - "enum": ["ignore", "infer", "error"] - }, - "block_size": { - "title": "Block Size", - "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", - "default": 0, - "order": 2, - "type": "integer" } - } + }, + "required": ["name", "format"] } - ], - "airbyte_hidden": true - }, - "service_account": { - "title": "Service Account Information", - "description": "Enter your Google Cloud service account key in JSON format", - "airbyte_secret": true, - "order": 0, - "type": "string" + }, + "service_account": { + "title": "Service Account Information", + "description": "Enter your Google Cloud service account key in JSON format", + "airbyte_secret": true, + "order": 0, + "type": "string" + }, + "bucket": { + "title": "Bucket", + "description": "Name of the GCS bucket where the file(s) exist.", + "order": 2, + "type": "string" + } }, - "bucket": { - "title": "Bucket", - "description": "Name of the GCS bucket where the file(s) exist.", - "order": 2, - "type": "string" - } - }, - "required": ["streams", "service_account", "bucket"] + "required": ["streams", "service_account", "bucket"] + } } } diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py index 48f12c433fd46..1ee82a3297090 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py @@ -6,10 +6,10 @@ import itertools import json import logging +import re from datetime import datetime, timedelta from io import IOBase from typing import Iterable, List, Optional -import re import pytz import smart_open From c9c8e34b8f04d4e7cf7cfdbfe67d1c251a2fceb2 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 15:43:50 -0600 Subject: [PATCH 05/13] Match spec.json --- .../source-gcs/integration_tests/spec.json | 197 +++++++++++++++++- 1 file changed, 191 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json index 4f04d69315bd1..47a40eeb762ba 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json @@ -17,22 +17,22 @@ }, "streams": { "title": "The list of streams to sync", - "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.", - "order": 3, + "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.", + "order": 10, "type": "array", "items": { - "title": "SourceGCSStreamConfig", + "title": "FileBasedStreamConfig", "type": "object", "properties": { "name": { "title": "Name", "description": "The name of the stream.", - "order": 0, "type": "string" }, "globs": { "title": "Globs", "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.", + "default": ["**"], "order": 1, "type": "array", "items": { @@ -41,7 +41,7 @@ }, "legacy_prefix": { "title": "Legacy Prefix", - "description": "The path prefix configured in previous versions of the GCS connector. This option is deprecated in favor of a single glob.", + "description": "The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", "airbyte_hidden": true, "type": "string" }, @@ -71,9 +71,27 @@ "format": { "title": "Format", "description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", - "order": 2, "type": "object", "oneOf": [ + { + "title": "Avro Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "avro", + "const": "avro", + "type": "string" + }, + "double_as_string": { + "title": "Convert Double Fields to Strings", + "description": "Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.", + "default": false, + "type": "boolean" + } + }, + "required": ["filetype"] + }, { "title": "CSV Format", "type": "object", @@ -233,6 +251,173 @@ } }, "required": ["filetype"] + }, + { + "title": "Jsonl Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "jsonl", + "const": "jsonl", + "type": "string" + } + }, + "required": ["filetype"] + }, + { + "title": "Parquet Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "parquet", + "const": "parquet", + "type": "string" + }, + "decimal_as_float": { + "title": "Convert Decimal Fields to Floats", + "description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.", + "default": false, + "type": "boolean" + } + }, + "required": ["filetype"] + }, + { + "title": "Unstructured Document Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "unstructured", + "const": "unstructured", + "type": "string" + }, + "skip_unprocessable_files": { + "title": "Skip Unprocessable Files", + "description": "If true, skip files that cannot be parsed and pass the error message along as the _ab_source_file_parse_error field. If false, fail the sync.", + "default": true, + "always_show": true, + "type": "boolean" + }, + "strategy": { + "title": "Parsing Strategy", + "description": "The strategy used to parse documents. `fast` extracts text directly from the document which doesn't work for all files. `ocr_only` is more reliable, but slower. `hi_res` is the most reliable, but requires an API key and a hosted instance of unstructured and can't be used with local mode. See the unstructured.io documentation for more details: https://unstructured-io.github.io/unstructured/core/partition.html#partition-pdf", + "default": "auto", + "always_show": true, + "order": 0, + "enum": ["auto", "fast", "ocr_only", "hi_res"], + "type": "string" + }, + "processing": { + "title": "Processing", + "description": "Processing configuration", + "default": { + "mode": "local" + }, + "type": "object", + "discriminator": { + "propertyName": "mode", + "mapping": { + "local": "#/definitions/LocalProcessingConfigModel", + "api": "#/definitions/APIProcessingConfigModel" + } + }, + "oneOf": [ + { + "title": "Local", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "local", + "const": "local", + "enum": ["local"], + "type": "string" + } + }, + "description": "Process files locally, supporting `fast` and `ocr` modes. This is the default option.", + "required": ["mode"] + }, + { + "title": "via API", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "api", + "const": "api", + "enum": ["api"], + "type": "string" + }, + "api_key": { + "title": "API Key", + "description": "The API key to use matching the environment", + "default": "", + "always_show": true, + "airbyte_secret": true, + "type": "string" + }, + "api_url": { + "title": "API URL", + "description": "The URL of the unstructured API to use", + "default": "https://api.unstructured.io", + "always_show": true, + "examples": ["https://api.unstructured.com"], + "type": "string" + }, + "parameters": { + "title": "Additional URL Parameters", + "description": "List of parameters send to the API", + "default": [], + "always_show": true, + "type": "array", + "items": { + "title": "APIParameterConfigModel", + "type": "object", + "properties": { + "name": { + "title": "Parameter name", + "description": "The name of the unstructured API parameter to use", + "examples": [ + "combine_under_n_chars", + "languages" + ], + "type": "string" + }, + "value": { + "title": "Value", + "description": "The value of the parameter", + "examples": ["true", "hi_res"], + "type": "string" + } + }, + "required": ["name", "value"] + } + } + }, + "description": "Process files via an API, using the `hi_res` mode. This option is useful for increased performance and accuracy, but requires an API key and a hosted instance of unstructured.", + "required": ["mode"] + } + ] + } + }, + "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", + "required": ["filetype"] + }, + { + "title": "Excel Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "excel", + "const": "excel", + "type": "string" + } + }, + "required": ["filetype"] } ] }, From f3a6630f2d5e95b3f5f20bb2b7387670c85d9738 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 16:11:01 -0600 Subject: [PATCH 06/13] Enable reading from all FileBasedSpec supported types --- .../source-gcs/acceptance-test-config.yml | 4 +- .../integration_tests/abnormal_state.json | 14 +++++ .../integration_tests/configured_catalog.json | 9 ++++ .../connectors/source-gcs/metadata.yaml | 2 +- .../connectors/source-gcs/pyproject.toml | 2 +- .../source-gcs/source_gcs/config.py | 53 ++----------------- .../source-gcs/source_gcs/stream_reader.py | 4 +- 7 files changed, 32 insertions(+), 56 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml index 23756e607992e..1baaf9b7f0130 100644 --- a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml @@ -6,7 +6,7 @@ acceptance_tests: tests: - spec_path: integration_tests/spec.json backward_compatibility_tests_config: - disable_for_version: 0.2.0 + disable_for_version: 0.5.0 connection: tests: - config_path: "secrets/config.json" @@ -21,6 +21,8 @@ acceptance_tests: tests: - config_path: "secrets/config.json" expect_trace_message_on_failure: false + - config_path: "secrets/config_jsonl.json" + expect_trace_message_on_failure: false incremental: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json index 6e138a1673cce..42e2767be1b7e 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json @@ -40,5 +40,19 @@ "name": "example_gzip" } } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "_ab_source_file_last_modified": "2094-03-21T16:13:20.571000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl", + "history": { + "https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl": "2094-03-21T16:13:20.571000Z" + } + }, + "stream_descriptor": { + "name": "example_1_jsonl" + } + } } ] diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index 976f67d776d72..09ab18b2ea2cc 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -35,6 +35,15 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "example_1_jsonl", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" } ] } diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index 54378d62919b2..cb9f1f6837696 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -7,7 +7,7 @@ data: connectorSubtype: file connectorType: source definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820 - dockerImageTag: 0.5.0 + dockerImageTag: 0.6.0 dockerRepository: airbyte/source-gcs documentationUrl: https://docs.airbyte.com/integrations/sources/gcs githubIssueLabel: source-gcs diff --git a/airbyte-integrations/connectors/source-gcs/pyproject.toml b/airbyte-integrations/connectors/source-gcs/pyproject.toml index faadf8c73ee81..342f289383ab9 100644 --- a/airbyte-integrations/connectors/source-gcs/pyproject.toml +++ b/airbyte-integrations/connectors/source-gcs/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "0.5.0" +version = "0.6.0" name = "source-gcs" description = "Source implementation for Gcs." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py index 8699e0f461d0d..906e952973cca 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py @@ -3,35 +3,10 @@ # -from typing import List, Optional - from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec -from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from pydantic.v1 import AnyUrl, Field -class SourceGCSStreamConfig(FileBasedStreamConfig): - name: str = Field(title="Name", description="The name of the stream.", order=0) - globs: Optional[List[str]] = Field( - title="Globs", - description="The pattern used to specify which files should be selected from the file system. For more information on glob " - 'pattern matching look here.', - order=1, - ) - # format: CsvFormat = Field( - # title="Format", - # description="The configuration options that are used to alter how to read incoming files that deviate from " - # "the standard formatting.", - # order=2, - # ) - legacy_prefix: Optional[str] = Field( - title="Legacy Prefix", - description="The path prefix configured in previous versions of the GCS connector. " - "This option is deprecated in favor of a single glob.", - airbyte_hidden=True, - ) - - class Config(AbstractFileBasedSpec): """ NOTE: When this Spec is changed, legacy_config_transformer.py must also be @@ -52,17 +27,6 @@ class Config(AbstractFileBasedSpec): bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=2) - # streams: List[SourceGCSStreamConfig] = Field( - # title="The list of streams to sync", - # description=( - # "Each instance of this configuration defines a stream. " - # "Use this to define which files belong in the stream, their format, and how they should be " - # "parsed and validated. When sending data to warehouse destination such as Snowflake or " - # "BigQuery, each stream is a separate table." - # ), - # order=3, - # ) - @classmethod def documentation_url(cls) -> AnyUrl: """ @@ -70,17 +34,6 @@ def documentation_url(cls) -> AnyUrl: """ return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https") - # @staticmethod - # def replace_enum_allOf_and_anyOf(schema): - # """ - # Replace allOf with anyOf when appropriate in the schema with one value. - # """ - # objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] - # if len(objects_to_check.get("allOf", [])) == 1: - # objects_to_check["anyOf"] = objects_to_check.pop("allOf") - # - # return super(Config, Config).replace_enum_allOf_and_anyOf(schema) - - # @staticmethod - # def remove_discriminator(schema) -> None: - # pass + @staticmethod + def remove_discriminator(schema) -> None: + pass diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py index 1ee82a3297090..912f13be66f24 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py @@ -6,7 +6,6 @@ import itertools import json import logging -import re from datetime import datetime, timedelta from io import IOBase from typing import Iterable, List, Optional @@ -24,7 +23,6 @@ "We don't have access to {uri}. The file appears to have become unreachable during sync." "Check whether key {uri} exists in `{bucket}` bucket and/or has proper ACL permissions" ) -FILE_FORMATS = "avro|csv|jsonl|tsv|parquet" class SourceGCSStreamReader(AbstractFileBasedStreamReader): @@ -78,7 +76,7 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo for blob in blobs: last_modified = blob.updated.astimezone(pytz.utc).replace(tzinfo=None) - if re.search(FILE_FORMATS, blob.name.lower()) and (not start_date or last_modified >= start_date): + if not start_date or last_modified >= start_date: uri = blob.generate_signed_url(expiration=timedelta(hours=1), version="v4") file_extension = ".".join(blob.name.split(".")[1:]) From 13c26325589117f2d215a5dee5c6e34654b8b794 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 16:29:07 -0600 Subject: [PATCH 07/13] Add changelog entry for source-gcs 0.6.0 --- docs/integrations/sources/gcs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/gcs.md b/docs/integrations/sources/gcs.md index df32aa1798b62..6b450d8fe0b8b 100644 --- a/docs/integrations/sources/gcs.md +++ b/docs/integrations/sources/gcs.md @@ -151,6 +151,7 @@ Leaving this field blank (default option) will disallow escaping. | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------| +| 0.6.0 | 2024-08-15 | [44015](https://github.com/airbytehq/airbyte/pull/44015) | Add support for all FileBasedSpec file types | | 0.5.0 | 2024-08-14 | [44070](https://github.com/airbytehq/airbyte/pull/44070) | Update CDK v4 and Python 3.10 dependencies | | 0.4.15 | 2024-08-12 | [43733](https://github.com/airbytehq/airbyte/pull/43733) | Update dependencies | | 0.4.14 | 2024-08-10 | [43512](https://github.com/airbytehq/airbyte/pull/43512) | Update dependencies | From 50d5cd48438441b769830dac1c0fded1ab44ee0f Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 17:19:29 -0600 Subject: [PATCH 08/13] Add JSONL specific creds file --- .../connectors/source-gcs/acceptance-test-config.yml | 2 -- airbyte-integrations/connectors/source-gcs/metadata.yaml | 5 +++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml index 1baaf9b7f0130..b73e6d9a99780 100644 --- a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml @@ -21,8 +21,6 @@ acceptance_tests: tests: - config_path: "secrets/config.json" expect_trace_message_on_failure: false - - config_path: "secrets/config_jsonl.json" - expect_trace_message_on_failure: false incremental: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index cb9f1f6837696..a025e5009e5bd 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -47,4 +47,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_SOURCE-GCS__JSONL + fileName: config_jsonl.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" From 179bba6aaf77e5f7c19c4e720c2a7e4e6455e9e0 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 17:40:08 -0600 Subject: [PATCH 09/13] Remove invalid creds file --- airbyte-integrations/connectors/source-gcs/metadata.yaml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index a025e5009e5bd..cb9f1f6837696 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -47,9 +47,4 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store - - name: SECRET_SOURCE-GCS__JSONL - fileName: config_jsonl.json - secretStore: - type: GSM - alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" From 82ce0da106949b8795ff7822e2a0d3a2cdfb4925 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 17:57:00 -0600 Subject: [PATCH 10/13] Remove duplicated jsonl entry --- .../source-gcs/integration_tests/configured_catalog.json | 9 --------- 1 file changed, 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index 09ab18b2ea2cc..a542ea8c12901 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -1,14 +1,5 @@ { "streams": [ - { - "stream": { - "name": "example_1_jsonl", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "overwrite" - }, { "stream": { "name": "example_1", From cab1cdb6bb8daaa67063002e255dd6e605326c46 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Thu, 15 Aug 2024 18:25:43 -0600 Subject: [PATCH 11/13] Fix reference to jsonl config --- airbyte-integrations/connectors/source-gcs/metadata.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index cb9f1f6837696..5253f98fa8a09 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -47,4 +47,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_SOURCE-GCS_JSONL__CREDS + fileName: config_jsonl.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" From 97e4e4488928a9e35a19e4ffcac3232c9225f360 Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Fri, 16 Aug 2024 11:58:23 -0600 Subject: [PATCH 12/13] Add JSONL-specific CAT tests --- .../source-gcs/acceptance-test-config.yml | 14 ++++++++++++++ .../integration_tests/abnormal_state.json | 14 -------------- .../integration_tests/abnormal_state_jsonl.json | 16 ++++++++++++++++ .../integration_tests/configured_catalog.json | 9 --------- .../configured_catalog_jsonl.json | 13 +++++++++++++ 5 files changed, 43 insertions(+), 23 deletions(-) create mode 100644 airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state_jsonl.json create mode 100644 airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog_jsonl.json diff --git a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml index b73e6d9a99780..65513c1a79421 100644 --- a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml @@ -17,16 +17,24 @@ acceptance_tests: tests: - config_path: "secrets/config.json" timeout_seconds: 2400 + - config_path: "secrets/config_jsonl.json" + timeout_seconds: 2400 basic_read: tests: - config_path: "secrets/config.json" expect_trace_message_on_failure: false + - config_path: "secrets/config_jsonl.json" + expect_trace_message_on_failure: false incremental: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" future_state: future_state_path: "integration_tests/abnormal_state.json" + - config_path: "secrets/config_jsonl.json" + configured_catalog_path: "integration_tests/configured_catalog_jsonl.json" + future_state: + future_state_path: "integration_tests/abnormal_state_jsonl.json" full_refresh: tests: - config_path: "secrets/config.json" @@ -38,3 +46,9 @@ acceptance_tests: example_2: - name: _ab_source_file_url bypass_reason: "Uri has autogenerated token in query params" + - config_path: "secrets/config_jsonl.json" + configured_catalog_path: "integration_tests/configured_catalog_jsonl.json" + ignored_fields: + example_1: + - name: _ab_source_file_url + bypass_reason: "Uri has autogenerated token in query params" diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json index 42e2767be1b7e..6e138a1673cce 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json @@ -40,19 +40,5 @@ "name": "example_gzip" } } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "_ab_source_file_last_modified": "2094-03-21T16:13:20.571000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl", - "history": { - "https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl": "2094-03-21T16:13:20.571000Z" - } - }, - "stream_descriptor": { - "name": "example_1_jsonl" - } - } } ] diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state_jsonl.json b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state_jsonl.json new file mode 100644 index 0000000000000..43e611099f990 --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state_jsonl.json @@ -0,0 +1,16 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_state": { + "_ab_source_file_last_modified": "2094-03-21T16:13:20.571000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl", + "history": { + "https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/test_data_1.jsonl": "2094-03-21T16:13:20.571000Z" + } + }, + "stream_descriptor": { + "name": "example_1_jsonl" + } + } + } +] diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index a542ea8c12901..b772480244ee2 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -26,15 +26,6 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "example_1_jsonl", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "overwrite" } ] } diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog_jsonl.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog_jsonl.json new file mode 100644 index 0000000000000..b797feea854c0 --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog_jsonl.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "example_1_jsonl", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + } + ] +} From 9ee8a3916f2743afc66b50e158894c25ab487b7f Mon Sep 17 00:00:00 2001 From: Erick Corona Date: Fri, 16 Aug 2024 16:22:32 -0600 Subject: [PATCH 13/13] Remove message limiting only CSV in gcs documentation --- docs/integrations/sources/gcs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/gcs.md b/docs/integrations/sources/gcs.md index 6b450d8fe0b8b..80af8c3a90ed6 100644 --- a/docs/integrations/sources/gcs.md +++ b/docs/integrations/sources/gcs.md @@ -33,7 +33,7 @@ Use the service account ID from above, grant read access to your target bucket. - Enter your GCS bucket name to the `Bucket` field - Add a stream 1. Give a **Name** to the stream - 2. In the **Format** box, use the dropdown menu to select the format of the files you'd like to replicate. The supported format is **CSV**. Toggling the **Optional fields** button within the **Format** box will allow you to enter additional configurations based on the selected format. For a detailed breakdown of these settings, refer to the [File Format section](#file-format-settings) below. + 2. In the **Format** box, use the dropdown menu to select the format of the files you'd like to replicate. Toggling the **Optional fields** button within the **Format** box will allow you to enter additional configurations based on the selected format. For a detailed breakdown of these settings, refer to the [File Format section](#file-format-settings) below. 3. Optionally, enter the **Globs** which dictates which files to be synced. This is a regular expression that allows Airbyte to pattern match the specific files to replicate. If you are replicating all the files within your bucket, use `**` as the pattern. For more precise pattern matching options, refer to the [Path Patterns section](#path-patterns) below. 4. (Optional) - If you want to enforce a specific schema, you can enter a **Input schema**. By default, this value is set to `{}` and will automatically infer the schema from the file\(s\) you are replicating. For details on providing a custom schema, refer to the [User Schema section](#user-schema). - Configure the optional **Start Date** parameter that marks a starting date and time in UTC for data replication. Any files that have _not_ been modified since this specified date/time will _not_ be replicated. Use the provided datepicker (recommended) or enter the desired date programmatically in the format `YYYY-MM-DDTHH:mm:ssZ`. Leaving this field blank will replicate data from all files that have not been excluded by the **Path Pattern** and **Path Prefix**.