Skip to content

Commit f6b4436

Browse files
mjgatzmichael-greene-gravielazebnyi
authored
✨ Source File: add fixed width file format support (#34678)
Co-authored-by: mgreene <[email protected]> Co-authored-by: Serhii Lazebnyi <[email protected]> Co-authored-by: Serhii Lazebnyi <[email protected]>
1 parent 462970f commit f6b4436

File tree

15 files changed

+183
-88
lines changed

15 files changed

+183
-88
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[run]
2+
omit =
3+
source_file/run.py

airbyte-integrations/connectors/source-file/acceptance-test-config.yml

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ acceptance_tests:
2828
extra_fields: no
2929
exact_order: no
3030
extra_records: yes
31+
file_types:
32+
skip_test: yes
33+
bypass_reason: "Source is not based on file based CDK"
3134
full_refresh:
3235
tests:
3336
- config_path: "integration_tests/config.json"

airbyte-integrations/connectors/source-file/integration_tests/cloud_spec.json

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"jsonl",
2121
"excel",
2222
"excel_binary",
23+
"fwf",
2324
"feather",
2425
"parquet",
2526
"yaml"

airbyte-integrations/connectors/source-file/integration_tests/file_formats_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
3030
("jsonl", "jsonl", 2, 6492, "jsonl"),
3131
("excel", "xls", 8, 50, "demo"),
3232
("excel", "xlsx", 8, 50, "demo"),
33+
("fwf", "txt", 4, 2, "demo"),
3334
("feather", "feather", 9, 3, "demo"),
3435
("parquet", "parquet", 9, 3, "demo"),
3536
("yaml", "yaml", 8, 3, "demo"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"streams": [
3+
{
4+
"stream": {
5+
"name": "test",
6+
"json_schema": {
7+
"$schema": "http://json-schema.org/draft-07/schema#",
8+
"type": "object",
9+
"properties": {
10+
"$schema": "http://json-schema.org/schema#",
11+
"type": "object",
12+
"properties": {
13+
"text": { "type": "string" },
14+
"num": { "type": "number" },
15+
"float": { "type": "number" },
16+
"bool": { "type": "string" }
17+
}
18+
}
19+
}
20+
},
21+
"sync_mode": "full_refresh",
22+
"destination_sync_mode": "overwrite"
23+
}
24+
]
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
text num float bool
2+
short 1 0.2 true
3+
long_text 33 0.0 false

airbyte-integrations/connectors/source-file/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: file
1111
connectorType: source
1212
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
13-
dockerImageTag: 0.3.16
13+
dockerImageTag: 0.3.17
1414
dockerRepository: airbyte/source-file
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/file
1616
githubIssueLabel: source-file

airbyte-integrations/connectors/source-file/source_file/client.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def storage_scheme(self) -> str:
172172
"""
173173
storage_name = self._provider["storage"].upper()
174174
parse_result = urlparse(self._url)
175+
175176
if storage_name == "GCS":
176177
return "gs://"
177178
elif storage_name == "S3":
@@ -191,7 +192,7 @@ def storage_scheme(self) -> str:
191192
elif parse_result.scheme:
192193
return parse_result.scheme
193194

194-
logger.error(f"Unknown Storage provider in: {self.full_url}")
195+
logger.error(f"Unknown Storage provider in: {self._url}")
195196
return ""
196197

197198
def _open_gcs_url(self) -> object:
@@ -328,6 +329,7 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
328329
"html": pd.read_html,
329330
"excel": pd.read_excel,
330331
"excel_binary": pd.read_excel,
332+
"fwf": pd.read_fwf,
331333
"feather": pd.read_feather,
332334
"parquet": pd.read_parquet,
333335
"orc": pd.read_orc,
@@ -354,9 +356,9 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
354356
yield record
355357
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
356358
return
357-
elif self._reader_options == "excel_binary":
359+
elif self._reader_format == "excel_binary":
358360
reader_options["engine"] = "pyxlsb"
359-
yield from reader(fp, **reader_options)
361+
yield reader(fp, **reader_options)
360362
elif self._reader_format == "excel":
361363
# Use openpyxl to read new-style Excel (xlsx) file; return to pandas for others
362364
try:
@@ -483,7 +485,7 @@ def streams(self, empty_schema: bool = False) -> Iterable:
483485

484486
def openpyxl_chunk_reader(self, file, **kwargs):
485487
"""Use openpyxl lazy loading feature to read excel files (xlsx only) in chunks of 500 lines at a time"""
486-
work_book = load_workbook(filename=file, read_only=True)
488+
work_book = load_workbook(filename=file)
487489
user_provided_column_names = kwargs.get("names")
488490
for sheetname in work_book.sheetnames:
489491
work_sheet = work_book[sheetname]

airbyte-integrations/connectors/source-file/source_file/source.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AirbyteConnectionStatus,
1717
AirbyteMessage,
1818
AirbyteRecordMessage,
19+
AirbyteStreamStatus,
1920
ConfiguredAirbyteCatalog,
2021
ConnectorSpecification,
2122
FailureType,
@@ -24,6 +25,7 @@
2425
)
2526
from airbyte_cdk.sources import Source
2627
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
28+
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
2729

2830
from .client import Client
2931
from .utils import LOCAL_STORAGE_NAME, dropbox_force_download
@@ -61,6 +63,7 @@ class SourceFile(Source):
6163
- read_json
6264
- read_html
6365
- read_excel
66+
- read_fwf
6467
- read_feather
6568
- read_parquet
6669
- read_orc
@@ -170,14 +173,33 @@ def read(
170173
fields = self.selected_fields(catalog, config)
171174
name = client.stream_name
172175

173-
logger.info(f"Reading {name} ({client.reader.full_url})...")
176+
configured_stream = catalog.streams[0]
177+
178+
logger.info(f"Syncing stream: {name} ({client.reader.full_url})...")
179+
180+
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)
181+
182+
record_counter = 0
174183
try:
175184
for row in client.read(fields=fields):
176185
record = AirbyteRecordMessage(stream=name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000)
186+
187+
record_counter += 1
188+
if record_counter == 1:
189+
logger.info(f"Marking stream {name} as RUNNING")
190+
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
191+
177192
yield AirbyteMessage(type=Type.RECORD, record=record)
193+
194+
logger.info(f"Marking stream {name} as STOPPED")
195+
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)
196+
178197
except Exception as err:
179198
reason = f"Failed to read data of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
180199
logger.error(reason)
200+
logger.exception(f"Encountered an exception while reading stream {name}")
201+
logger.info(f"Marking stream {name} as STOPPED")
202+
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
181203
raise err
182204

183205
@staticmethod

airbyte-integrations/connectors/source-file/source_file/spec.json

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"jsonl",
2121
"excel",
2222
"excel_binary",
23+
"fwf",
2324
"feather",
2425
"parquet",
2526
"yaml"

airbyte-integrations/connectors/source-file/unit_tests/test_client.py

+44-15
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77

88
import pytest
99
from airbyte_cdk.utils import AirbyteTracedException
10-
from pandas import read_csv, read_excel
10+
from pandas import read_csv, read_excel, testing
1111
from paramiko import SSHException
1212
from source_file.client import Client, URLFile
13+
from source_file.utils import backoff_handler
1314
from urllib3.exceptions import ProtocolError
1415

1516

@@ -34,21 +35,22 @@ def csv_format_client():
3435

3536

3637
@pytest.mark.parametrize(
37-
"storage, expected_scheme",
38+
"storage, expected_scheme, url",
3839
[
39-
("GCS", "gs://"),
40-
("S3", "s3://"),
41-
("AZBLOB", "azure://"),
42-
("HTTPS", "https://"),
43-
("SSH", "scp://"),
44-
("SCP", "scp://"),
45-
("SFTP", "sftp://"),
46-
("WEBHDFS", "webhdfs://"),
47-
("LOCAL", "file://"),
40+
("GCS", "gs://", "http://localhost"),
41+
("S3", "s3://", "http://localhost"),
42+
("AZBLOB", "azure://", "http://localhost"),
43+
("HTTPS", "https://", "http://localhost"),
44+
("SSH", "scp://", "http://localhost"),
45+
("SCP", "scp://", "http://localhost"),
46+
("SFTP", "sftp://", "http://localhost"),
47+
("WEBHDFS", "webhdfs://", "http://localhost"),
48+
("LOCAL", "file://", "http://localhost"),
49+
("WRONG", "", ""),
4850
],
4951
)
50-
def test_storage_scheme(storage, expected_scheme):
51-
urlfile = URLFile(provider={"storage": storage}, url="http://localhost")
52+
def test_storage_scheme(storage, expected_scheme, url):
53+
urlfile = URLFile(provider={"storage": storage}, url=url)
5254
assert urlfile.storage_scheme == expected_scheme
5355

5456

@@ -80,8 +82,27 @@ def test_load_dataframes_xlsb(config, absolute_path, test_files):
8082
assert read_file.equals(expected)
8183

8284

83-
def test_load_nested_json(client, absolute_path, test_files):
84-
f = f"{absolute_path}/{test_files}/formats/json/demo.json"
85+
@pytest.mark.parametrize("file_name, should_raise_error", [("test.xlsx", False), ("test_one_line.xlsx", True)])
86+
def test_load_dataframes_xlsx(config, absolute_path, test_files, file_name, should_raise_error):
87+
config["format"] = "excel"
88+
client = Client(**config)
89+
f = f"{absolute_path}/{test_files}/{file_name}"
90+
if should_raise_error:
91+
with pytest.raises(AirbyteTracedException):
92+
next(client.load_dataframes(fp=f))
93+
else:
94+
read_file = next(client.load_dataframes(fp=f))
95+
expected = read_excel(f, engine="openpyxl")
96+
assert read_file.equals(expected)
97+
98+
99+
@pytest.mark.parametrize("file_format, file_path", [("json", "formats/json/demo.json"),
100+
("jsonl", "formats/jsonl/jsonl_nested.jsonl")])
101+
def test_load_nested_json(client, config, absolute_path, test_files, file_format, file_path):
102+
if file_format == "jsonl":
103+
config["format"] = file_format
104+
client = Client(**config)
105+
f = f"{absolute_path}/{test_files}/{file_path}"
85106
with open(f, mode="rb") as file:
86107
assert client.load_nested_json(fp=file)
87108

@@ -189,3 +210,11 @@ def patched_open(self):
189210
assert call_count == 7
190211

191212
assert sleep_mock.call_count == 5
213+
214+
215+
def test_backoff_handler(caplog):
216+
details = {"tries": 1, "wait": 1}
217+
backoff_handler(details)
218+
expected = [('airbyte', 20, 'Caught retryable error after 1 tries. Waiting 1 seconds then retrying...')]
219+
220+
assert caplog.record_tuples == expected

airbyte-integrations/connectors/source-file/unit_tests/test_source.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Type,
2222
)
2323
from airbyte_cdk.utils import AirbyteTracedException
24+
from airbyte_protocol.models.airbyte_protocol import Type as MessageType
2425
from source_file.source import SourceFile
2526

2627
logger = logging.getLogger("airbyte")
@@ -95,7 +96,8 @@ def test_nan_to_null(absolute_path, test_files):
9596

9697
source = SourceFile()
9798
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
98-
records = [r.record.data for r in records]
99+
100+
records = [r.record.data for r in records if r.type == MessageType.RECORD]
99101
assert records == [
100102
{"col1": "key1", "col2": 1.11, "col3": None},
101103
{"col1": "key2", "col2": None, "col3": 2.22},
@@ -105,13 +107,14 @@ def test_nan_to_null(absolute_path, test_files):
105107

106108
config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
107109
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
108-
records = [r.record.data for r in records]
110+
records = [r.record.data for r in records if r.type == MessageType.RECORD]
109111
assert records == []
110112

111113
config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})
112114

113115
with pytest.raises(Exception):
114-
next(source.read(logger=logger, config=config, catalog=catalog))
116+
for record in source.read(logger=logger, config=config, catalog=catalog):
117+
pass
115118

116119

117120
def test_spec(source):
@@ -176,7 +179,7 @@ def test_pandas_header_not_none(absolute_path, test_files):
176179

177180
source = SourceFile()
178181
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
179-
records = [r.record.data for r in records]
182+
records = [r.record.data for r in records if r.type == MessageType.RECORD]
180183
assert records == [
181184
{"text11": "text21", "text12": "text22"},
182185
]
@@ -195,7 +198,7 @@ def test_pandas_header_none(absolute_path, test_files):
195198

196199
source = SourceFile()
197200
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
198-
records = [r.record.data for r in records]
201+
records = [r.record.data for r in records if r.type == MessageType.RECORD]
199202
assert records == [
200203
{"0": "text11", "1": "text12"},
201204
{"0": "text21", "1": "text22"},
@@ -224,4 +227,4 @@ def test_incorrect_reader_options(absolute_path, test_files):
224227
):
225228
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
226229
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
227-
records = [r.record.data for r in records]
230+
records = [r.record.data for r in records if r.type == MessageType.RECORD]

0 commit comments

Comments
 (0)