Skip to content

Commit 1d3a17a

Browse files
Phlairjzhuan-icimsdavinchia
authored
🎉 Source S3 - memory & performance optimisations + advanced CSV options (#6615)
* memory & performance optimisations * address comments * version bump * added advanced_options for reading csv without header, and more custom pyarrow ReadOptions * updated to use the latest airbyte-cdk * updated docs * bump source-s3 to 0.1.6 * remove unneeded lines * Use the all dep ami for python builds. * ec2-instance-id should be ec2-image-id * ec2-instance-id should be ec2-image-id Co-authored-by: Jingkun Zhuang <[email protected]> Co-authored-by: Davin Chia <[email protected]>
1 parent 25110c1 commit 1d3a17a

File tree

15 files changed

+168
-70
lines changed

15 files changed

+168
-70
lines changed

‎.github/workflows/publish-command.yml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ jobs:
3434
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
3535
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
3636
github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }}
37+
ec2-image-id: ami-0d648081937c75a73
3738
publish-image:
3839
needs: start-publish-image-runner
3940
runs-on: ${{ needs.start-publish-image-runner.outputs.label }}

‎.github/workflows/test-command.yml

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ jobs:
3333
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
3434
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
3535
github-token: ${{ secrets.SELF_RUNNER_GITHUB_ACCESS_TOKEN }}
36+
ec2-image-id: ami-0d648081937c75a73
3637
integration-test:
3738
timeout-minutes: 240
3839
needs: start-test-runner

‎airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
33
"name": "S3",
44
"dockerRepository": "airbyte/source-s3",
5-
"dockerImageTag": "0.1.5",
5+
"dockerImageTag": "0.1.6",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3"
77
}

‎airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
- sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
8686
name: S3
8787
dockerRepository: airbyte/source-s3
88-
dockerImageTag: 0.1.5
88+
dockerImageTag: 0.1.6
8989
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
9090
sourceType: file
9191
- sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87

‎airbyte-integrations/connectors/source-s3/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.5
20+
LABEL io.airbyte.version=0.1.6
2121
LABEL io.airbyte.name=airbyte/source-s3
2222

2323

‎airbyte-integrations/connectors/source-s3/integration_tests/spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@
9393
"{\"timestamp_parsers\": [\"%m/%d/%Y %H:%M\", \"%Y/%m/%d %H:%M\"], \"strings_can_be_null\": true, \"null_values\": [\"NA\", \"NULL\"]}"
9494
],
9595
"type": "string"
96+
},
97+
"advanced_options": {
98+
"title": "Advanced Options",
99+
"description": "Optionally add a valid JSON string here to provide additional <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions\" target=\"_blank\">Pyarrow ReadOptions</a>. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.",
100+
"default": "{}",
101+
"examples": [
102+
"{\"column_names\": [\"column1\", \"column2\"]}"
103+
],
104+
"type": "string"
96105
}
97106
}
98107
},

‎airbyte-integrations/connectors/source-s3/setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk~=0.1.7",
9+
"airbyte-cdk~=0.1.28",
1010
"pyarrow==4.0.1",
1111
"smart-open[s3]==5.1.0",
1212
"wcmatch==8.2",

‎airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ def _read_options(self):
2828
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
2929
build ReadOptions object like: pa.csv.ReadOptions(**self._read_options())
3030
"""
31-
return {"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")}
31+
return {
32+
**{"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")},
33+
**json.loads(self._format.get("advanced_options", "{}")),
34+
}
3235

3336
def _parse_options(self):
3437
"""

‎airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py

+5
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,8 @@ class Config:
5050
'{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}'
5151
],
5252
)
53+
advanced_options: str = Field(
54+
default="{}",
55+
description="Optionally add a valid JSON string here to provide additional <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions\" target=\"_blank\">Pyarrow ReadOptions</a>. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.",
56+
examples=["{\"column_names\": [\"column1\", \"column2\"]}"],
57+
)

‎airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py

+96-58
Large diffs are not rendered by default.

‎airbyte-integrations/connectors/source-s3/source_s3/stream.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]:
3636
else:
3737
session = boto3session.Session()
3838
client_config = Config(signature_version=UNSIGNED)
39-
client = make_s3_client(self._provider, config=client_config, session=session)
39+
client = make_s3_client(provider, config=client_config, session=session)
4040

4141
ctoken = None
4242
while True:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
1,PVdhmjb1,False,12,-31.3,2021-07-14,2021-07-14 15:30:09.224125
2+
2,j4DyXTS7,True,-8,41.6,2021-07-14,2021-07-14 15:30:09.224383
3+
3,v0w8fTME,False,7,-27.5,2021-07-14,2021-07-14 15:30:09.224527
4+
4,1q6jD8Np,False,-8,-6.7,2021-07-14,2021-07-14 15:30:09.224741
5+
5,77h4aiMP,True,-15,-13.7,2021-07-14,2021-07-14 15:30:09.224907
6+
6,Le35Wyic,True,3,35.3,2021-07-14,2021-07-14 15:30:09.225033
7+
7,xZhh1Kyl,False,10,-9.2,2021-07-14,2021-07-14 15:30:09.225145
8+
8,M2t286iJ,False,4,-3.5,2021-07-14,2021-07-14 15:30:09.225320

‎airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py

+27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
#
44

5+
import json
56
import os
67
from pathlib import Path
78
from typing import Any, List, Mapping
@@ -249,4 +250,30 @@ def test_files(self) -> List[Mapping[str, Any]]:
249250
"line_checks": {},
250251
"fails": ["test_get_inferred_schema", "test_stream_records"],
251252
},
253+
{
254+
# no header test
255+
"test_alias": "no header csv file",
256+
"AbstractFileParser": CsvParser(
257+
format={
258+
"filetype": "csv",
259+
"advanced_options": json.dumps({
260+
"column_names": ["id", "name", "valid", "code", "degrees", "birthday", "last_seen"]
261+
})
262+
},
263+
master_schema={}
264+
),
265+
"filepath": os.path.join(SAMPLE_DIRECTORY, "csv/test_file_8_no_header.csv"),
266+
"num_records": 8,
267+
"inferred_schema": {
268+
"id": "integer",
269+
"name": "string",
270+
"valid": "boolean",
271+
"code": "integer",
272+
"degrees": "number",
273+
"birthday": "string",
274+
"last_seen": "string",
275+
},
276+
"line_checks": {},
277+
"fails": [],
278+
},
252279
]

‎airbyte-webapp/src/App.tsx

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const Features: Feature[] = [
4444
{
4545
id: FeatureItem.AllowUpdateConnectors,
4646
},
47+
{
48+
id: FeatureItem.AllowOAuthConnector,
49+
},
4750
];
4851

4952
const StyleProvider: React.FC = ({ children }) => (

‎docs/integrations/sources/s3.md

+9-6
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,16 @@ Since CSV files are effectively plain text, providing specific reader options is
181181
* `double_quote` : Whether two quotes in a quoted CSV value denote a single quote in the data.
182182
* `newlines_in_values` : Sometimes referred to as `multiline`. In most cases, newline characters signal the end of a row in a CSV, however text data may contain newline characters within it. Setting this to True allows correct parsing in this case.
183183
* `block_size` : This is the number of bytes to process in memory at a time while reading files. The default value here is usually fine but if your table is particularly wide \(lots of columns / data in fields is large\) then raising this might solve failures on detecting schema. Since this defines how much data to read into memory, raising this too high could cause Out Of Memory issues so use with caution.
184+
* `additional_reader_options` : This allows for editing the less commonly required CSV [ConvertOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions). The value must be a valid JSON string, e.g.:
184185

185-
The final setting in the UI is `additional_reader_options`. This is a catch-all to allow for editing the less commonly required CSV parsing options. The value must be a valid JSON string, e.g.:
186+
```text
187+
{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}
188+
```
189+
* `advanced_options` : This allows for editing the less commonly required CSV [ReadOptions](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions). The value must be a valid JSON string. One use case for this is when your CSV has no header, or you want to use custom column names, you can specify `column_names` using this option.
186190
187-
```text
188-
{"timestamp_parsers": ["%m/%d/%Y %H:%M", "%Y/%m/%d %H:%M"], "strings_can_be_null": true, "null_values": ["NA", "NULL"]}
189-
```
190-
191-
You can find details on [available options here](https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions).
191+
```test
192+
{"column_names": ["column1", "column2", "column3"]}
193+
```
192194
193195
#### Parquet
194196
@@ -204,6 +206,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py
204206
205207
| Version | Date | Pull Request | Subject |
206208
| :--- | :--- | :--- | :--- |
209+
| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. |
207210
| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services |
208211
| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format |
209212
| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference |

0 commit comments

Comments
 (0)