Skip to content

Commit 44ac470

Browse files
mdibaieevincentkococtavia-squidington-iii
authored
Google Sheets: add row_id to rows and use as primary key (#19215)
* source-google-sheets: add row_id to rows and use as primary key * Update Dockerfile * Update google-sheets.md * Update Dockerfile * Update google-sheets.md * auto-bump connector version Co-authored-by: Vincent Koc <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent d59275d commit 44ac470

File tree

7 files changed

+43
-14
lines changed

7 files changed

+43
-14
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@
574574
- name: Google Sheets
575575
sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
576576
dockerRepository: airbyte/source-google-sheets
577-
dockerImageTag: 0.2.21
577+
dockerImageTag: 0.2.30
578578
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
579579
icon: google-sheets.svg
580580
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5393,7 +5393,7 @@
53935393
oauthFlowOutputParameters:
53945394
- - "access_token"
53955395
- - "refresh_token"
5396-
- dockerImage: "airbyte/source-google-sheets:0.2.21"
5396+
- dockerImage: "airbyte/source-google-sheets:0.2.30"
53975397
spec:
53985398
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-sheets"
53995399
connectionSpecification:

airbyte-integrations/connectors/source-google-sheets/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY google_sheets_source ./google_sheets_source
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.2.21
37+
LABEL io.airbyte.version=0.2.30
3838
LABEL io.airbyte.name=airbyte/source-google-sheets

airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,15 @@ def read(
169169
if len(row_values) == 0:
170170
break
171171

172+
row_id = row_cursor
172173
for row in row_values:
173174
if not Helpers.is_row_empty(row) and Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
174-
yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name))
175+
yield AirbyteMessage(
176+
type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row_id, row, column_index_to_name)
177+
)
178+
row_id += 1
179+
180+
row_cursor += ROW_BATCH_SIZE + 1
175181
logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
176182

177183
@staticmethod

airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,22 @@ def headers_to_airbyte_stream(logger: AirbyteLogger, sheet_name: str, header_row
5353
if duplicate_fields:
5454
logger.warn(f"Duplicate headers found in {sheet_name}. Ignoring them :{duplicate_fields}")
5555

56+
props = {field: {"type": "string"} for field in fields}
57+
props["row_id"] = {"type": "integer"}
5658
sheet_json_schema = {
5759
"$schema": "http://json-schema.org/draft-07/schema#",
5860
"type": "object",
61+
"required": ["row_id"],
5962
# For simplicity, the type of every cell is a string
60-
"properties": {field: {"type": "string"} for field in fields},
63+
"properties": props,
6164
}
6265

63-
return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema, supported_sync_modes=[SyncMode.full_refresh])
66+
return AirbyteStream(
67+
name=sheet_name,
68+
json_schema=sheet_json_schema,
69+
supported_sync_modes=[SyncMode.full_refresh],
70+
source_defined_primary_key=[["row_id"]],
71+
)
6472

6573
@staticmethod
6674
def get_valid_headers_and_duplicates(header_row_values: List[str]) -> (List[str], List[str]):
@@ -121,8 +129,10 @@ def parse_sheet_and_column_names_from_catalog(catalog: ConfiguredAirbyteCatalog)
121129
return sheet_to_column_name
122130

123131
@staticmethod
124-
def row_data_to_record_message(sheet_name: str, cell_values: List[str], column_index_to_name: Dict[int, str]) -> AirbyteRecordMessage:
125-
data = {}
132+
def row_data_to_record_message(
133+
sheet_name: str, row_id: int, cell_values: List[str], column_index_to_name: Dict[int, str]
134+
) -> AirbyteRecordMessage:
135+
data = {"row_id": row_id}
126136
for relevant_index in sorted(column_index_to_name.keys()):
127137
if relevant_index >= len(cell_values):
128138
break

airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@ def test_headers_to_airbyte_stream(self):
2727
sheet_name = "sheet1"
2828
header_values = ["h1", "h2", "h3"]
2929

30+
props = {header: {"type": "string"} for header in header_values}
31+
props["row_id"] = {"type": "integer"}
3032
expected_stream = AirbyteStream(
3133
name=sheet_name,
3234
json_schema={
3335
"$schema": "http://json-schema.org/draft-07/schema#",
3436
"type": "object",
37+
"required": ["row_id"],
3538
# For simplicity, the type of every cell is a string
36-
"properties": {header: {"type": "string"} for header in header_values},
39+
"properties": props,
3740
},
41+
source_defined_primary_key=[["row_id"]],
3842
supported_sync_modes=[SyncMode.full_refresh],
3943
)
4044

@@ -57,15 +61,20 @@ def test_duplicate_headers_to_ab_stream_ignores_duplicates(self):
5761
header_values = ["h1", "h1", "h3"]
5862

5963
# h1 is ignored because it is duplicate
60-
expected_stream_header_values = ["h3"]
64+
props = {
65+
"h3": {"type": "string"},
66+
"row_id": {"type": "integer"},
67+
}
6168
expected_stream = AirbyteStream(
6269
name=sheet_name,
6370
json_schema={
6471
"$schema": "http://json-schema.org/draft-07/schema#",
6572
"type": "object",
73+
"required": ["row_id"],
6674
# For simplicity, the type of every cell is a string
67-
"properties": {header: {"type": "string"} for header in expected_stream_header_values},
75+
"properties": props,
6876
},
77+
source_defined_primary_key=[["row_id"]],
6978
supported_sync_modes=[SyncMode.full_refresh],
7079
)
7180

@@ -81,9 +90,11 @@ def test_headers_to_airbyte_stream_blank_values_terminate_row(self):
8190
json_schema={
8291
"$schema": "http://json-schema.org/draft-07/schema#",
8392
"type": "object",
93+
"required": ["row_id"],
8494
# For simplicity, the type of every cell is a string
85-
"properties": {"h1": {"type": "string"}},
95+
"properties": {"h1": {"type": "string"}, "row_id": {"type": "integer"}},
8696
},
97+
source_defined_primary_key=[["row_id"]],
8798
supported_sync_modes=[SyncMode.full_refresh],
8899
)
89100
actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values)
@@ -143,10 +154,11 @@ def test_row_data_to_record_message(self):
143154
sheet = "my_sheet"
144155
cell_values = ["v1", "v2", "v3", "v4"]
145156
column_index_to_name = {0: "c1", 3: "c4"}
157+
row_id = 1
146158

147-
actual = Helpers.row_data_to_record_message(sheet, cell_values, column_index_to_name)
159+
actual = Helpers.row_data_to_record_message(sheet, row_id, cell_values, column_index_to_name)
148160

149-
expected = AirbyteRecordMessage(stream=sheet, data={"c1": "v1", "c4": "v4"}, emitted_at=1)
161+
expected = AirbyteRecordMessage(stream=sheet, data={"row_id": row_id, "c1": "v1", "c4": "v4"}, emitted_at=1)
150162
self.assertEqual(expected.stream, actual.stream)
151163
self.assertEqual(expected.data, actual.data)
152164

docs/integrations/sources/google-sheets.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ The [Google API rate limit](https://developers.google.com/sheets/api/limits) is
7171

7272
| Version | Date | Pull Request | Subject |
7373
| ------- | ---------- | -------------------------------------------------------- | ----------------------------------------------------------------------------- |
74+
| 0.2.30 | 2022-10-09 | [](https://github.com/airbytehq/airbyte/pull/) | Add row_id to rows and use as primary key |
7475
| 0.2.21 | 2022-10-04 | [15591](https://github.com/airbytehq/airbyte/pull/15591) | Clean instantiation of AirbyteStream |
7576
| 0.2.20 | 2022-10-10 | [17766](https://github.com/airbytehq/airbyte/pull/17766) | Fix null pointer exception when parsing the spreadsheet id. |
7677
| 0.2.19 | 2022-09-29 | [17410](https://github.com/airbytehq/airbyte/pull/17410) | Use latest CDK. |

0 commit comments

Comments
 (0)