Skip to content

Commit e19e634

Browse files
Snowflake Cortex destination : Bug fixes (#38206)
1 parent 5ecaef0 commit e19e634

File tree

10 files changed

+77
-22
lines changed

10 files changed

+77
-22
lines changed

airbyte-integrations/connectors/destination-snowflake-cortex/destination_snowflake_cortex/config.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class PasswordBasedAuthorizationModel(BaseModel):
1717
airbyte_secret=True,
1818
description="Enter the password you want to use to access the database",
1919
examples=["AIRBYTE_PASSWORD"],
20+
order=7,
2021
)
2122

2223
class Config:
@@ -28,42 +29,42 @@ class SnowflakeCortexIndexingModel(BaseModel):
2829
host: str = Field(
2930
...,
3031
title="Host",
31-
airbyte_secret=True,
32+
order=1,
3233
description="Enter the account name you want to use to access the database. This is usually the identifier before .snowflakecomputing.com",
3334
examples=["AIRBYTE_ACCOUNT"],
3435
)
3536
role: str = Field(
3637
...,
3738
title="Role",
38-
airbyte_secret=True,
39+
order=2,
3940
description="Enter the role that you want to use to access Snowflake",
4041
examples=["AIRBYTE_ROLE", "ACCOUNTADMIN"],
4142
)
4243
warehouse: str = Field(
4344
...,
4445
title="Warehouse",
45-
airbyte_secret=True,
46+
order=3,
4647
description="Enter the name of the warehouse that you want to sync data into",
4748
examples=["AIRBYTE_WAREHOUSE"],
4849
)
4950
database: str = Field(
5051
...,
5152
title="Database",
52-
airbyte_secret=True,
53+
order=4,
5354
description="Enter the name of the database that you want to sync data into",
5455
examples=["AIRBYTE_DATABASE"],
5556
)
5657
default_schema: str = Field(
5758
...,
5859
title="Default Schema",
59-
airbyte_secret=True,
60+
order=5,
6061
description="Enter the name of the default schema",
6162
examples=["AIRBYTE_SCHEMA"],
6263
)
6364
username: str = Field(
6465
...,
6566
title="Username",
66-
airbyte_secret=True,
67+
order=6,
6768
description="Enter the name of the user you want to use to access the database",
6869
examples=["AIRBYTE_USER"],
6970
)

airbyte-integrations/connectors/destination-snowflake-cortex/destination_snowflake_cortex/destination.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from destination_snowflake_cortex.config import ConfigModel
1717
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer
1818

19-
BATCH_SIZE = 32
19+
BATCH_SIZE = 150
2020

2121

2222
class DestinationSnowflakeCortex(Destination):

airbyte-integrations/connectors/destination-snowflake-cortex/destination_snowflake_cortex/indexer.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44

5+
import copy
56
import uuid
67
from typing import Any, Iterable, Optional
78

@@ -85,7 +86,7 @@ def _get_updated_catalog(self) -> ConfiguredAirbyteCatalog:
8586
metadata -> metadata of the record
8687
embedding -> embedding of the document content
8788
"""
88-
updated_catalog = self.catalog
89+
updated_catalog = copy.deepcopy(self.catalog)
8990
# update each stream in the catalog
9091
for stream in updated_catalog.streams:
9192
# TO-DO: Revisit this - Clear existing properties, if anys, since we are not entirely sure what's in the configured catalog.
@@ -144,7 +145,8 @@ def get_write_strategy(self, stream_name: str) -> WriteStrategy:
144145
for stream in self.catalog.streams:
145146
if stream.stream.name == stream_name:
146147
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
147-
return WriteStrategy.REPLACE
148+
# we will use append here since we will remove the existing records and add new ones.
149+
return WriteStrategy.APPEND
148150
if stream.destination_sync_mode == DestinationSyncMode.append:
149151
return WriteStrategy.APPEND
150152
if stream.destination_sync_mode == DestinationSyncMode.append_dedup:
@@ -170,10 +172,22 @@ def index(self, document_chunks: Iterable[Any], namespace: str, stream: str):
170172
cortex_processor.process_airbyte_messages(airbyte_messages, self.get_write_strategy(stream))
171173

172174
def delete(self, delete_ids: list[str], namespace: str, stream: str):
173-
# delete is generally used when we use full refresh/overwrite strategy.
174-
# PyAirbyte's sync will take care of overwriting the records. Hence, we don't need to do anything here.
175+
# this delete is specific to vector stores, hence not implemented here
175176
pass
176177

178+
def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None:
179+
"""
180+
Run before the sync starts. This method makes sure that all records in the destination that belong to streams with a destination mode of overwrite are deleted.
181+
"""
182+
table_list = self.default_processor._get_tables_list()
183+
for stream in catalog.streams:
184+
# remove all records for streams with overwrite mode
185+
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
186+
stream_name = stream.stream.name
187+
if stream_name.lower() in [table.lower() for table in table_list]:
188+
self.default_processor._execute_sql(f"DELETE FROM {stream_name}")
189+
pass
190+
177191
def check(self) -> Optional[str]:
178192
self.default_processor._get_tables_list()
179193
# TODO: check to see if vector type is available in snowflake instance

airbyte-integrations/connectors/destination-snowflake-cortex/integration_tests/integration_test.py

+16
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,22 @@ def test_write(self):
124124
assert(len(result) == 1)
125125
result[0] == "str_col: Cats are nice"
126126

127+
128+
def test_overwrite_mode_deletes_records(self):
129+
self._delete_table("mystream")
130+
catalog = self._get_configured_catalog(DestinationSyncMode.overwrite)
131+
first_state_message = self._state({"state": "1"})
132+
first_record_chunk = [self._record("mystream", f"Dogs are number {i}", i) for i in range(4)]
133+
134+
# initial sync with replace
135+
destination = DestinationSnowflakeCortex()
136+
list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message]))
137+
assert(self._get_record_count("mystream") == 4)
138+
139+
# following should replace existing records
140+
append_catalog = self._get_configured_catalog(DestinationSyncMode.overwrite)
141+
list(destination.write(self.config, append_catalog, [self._record("mystream", "Cats are nice", 6), first_state_message]))
142+
assert(self._get_record_count("mystream") == 1)
127143

128144
"""
129145
Following tests are not code specific, but are useful to confirm that the Cortex functions are available and behaving as expcected

airbyte-integrations/connectors/destination-snowflake-cortex/integration_tests/spec.json

+7-6
Original file line numberDiff line numberDiff line change
@@ -319,42 +319,42 @@
319319
"host": {
320320
"title": "Host",
321321
"description": "Enter the account name you want to use to access the database. This is usually the identifier before .snowflakecomputing.com",
322-
"airbyte_secret": true,
322+
"order": 1,
323323
"examples": ["AIRBYTE_ACCOUNT"],
324324
"type": "string"
325325
},
326326
"role": {
327327
"title": "Role",
328328
"description": "Enter the role that you want to use to access Snowflake",
329-
"airbyte_secret": true,
329+
"order": 2,
330330
"examples": ["AIRBYTE_ROLE", "ACCOUNTADMIN"],
331331
"type": "string"
332332
},
333333
"warehouse": {
334334
"title": "Warehouse",
335335
"description": "Enter the name of the warehouse that you want to sync data into",
336-
"airbyte_secret": true,
336+
"order": 3,
337337
"examples": ["AIRBYTE_WAREHOUSE"],
338338
"type": "string"
339339
},
340340
"database": {
341341
"title": "Database",
342342
"description": "Enter the name of the database that you want to sync data into",
343-
"airbyte_secret": true,
343+
"order": 4,
344344
"examples": ["AIRBYTE_DATABASE"],
345345
"type": "string"
346346
},
347347
"default_schema": {
348348
"title": "Default Schema",
349349
"description": "Enter the name of the default schema",
350-
"airbyte_secret": true,
350+
"order": 5,
351351
"examples": ["AIRBYTE_SCHEMA"],
352352
"type": "string"
353353
},
354354
"username": {
355355
"title": "Username",
356356
"description": "Enter the name of the user you want to use to access the database",
357-
"airbyte_secret": true,
357+
"order": 6,
358358
"examples": ["AIRBYTE_USER"],
359359
"type": "string"
360360
},
@@ -367,6 +367,7 @@
367367
"description": "Enter the password you want to use to access the database",
368368
"airbyte_secret": true,
369369
"examples": ["AIRBYTE_PASSWORD"],
370+
"order": 7,
370371
"type": "string"
371372
}
372373
},

airbyte-integrations/connectors/destination-snowflake-cortex/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ data:
1313
connectorSubtype: vectorstore
1414
connectorType: destination
1515
definitionId: d9e5418d-f0f4-4d19-a8b1-5630543638e2
16-
dockerImageTag: 0.1.0
16+
dockerImageTag: 0.1.1
1717
dockerRepository: airbyte/destination-snowflake-cortex
1818
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake-cortex
1919
githubIssueLabel: destination-snowflake-cortex

airbyte-integrations/connectors/destination-snowflake-cortex/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
66
name = "airbyte-destination-snowflake-cortex"
7-
version = "0.1.0"
7+
version = "0.1.1"
88
description = "Airbyte destination implementation for Snowflake cortex."
99
authors = ["Airbyte <[email protected]>"]
1010
license = "MIT"

airbyte-integrations/connectors/destination-snowflake-cortex/unit_tests/destination_test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,5 @@ def test_write(self, MockedEmbedder, MockedSnowflakeCortexIndexer, MockedWriter)
8181
destination = DestinationSnowflakeCortex()
8282
list(destination.write(self.config, configured_catalog, input_messages))
8383

84-
MockedWriter.assert_called_once_with(self.config_model.processing, mock_indexer, mock_embedder, batch_size=32, omit_raw_text=False)
84+
MockedWriter.assert_called_once_with(self.config_model.processing, mock_indexer, mock_embedder, batch_size=150, omit_raw_text=False)
8585
mock_writer.write.assert_called_once_with(configured_catalog, input_messages)

airbyte-integrations/connectors/destination-snowflake-cortex/unit_tests/indexer_test.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_create_state_message():
135135
def test_get_write_strategy():
136136
indexer = _create_snowflake_cortex_indexer(generate_catalog())
137137
assert(indexer.get_write_strategy('example_stream') == WriteStrategy.MERGE)
138-
assert(indexer.get_write_strategy('example_stream2') == WriteStrategy.REPLACE)
138+
assert(indexer.get_write_strategy('example_stream2') == WriteStrategy.APPEND)
139139
assert(indexer.get_write_strategy('example_stream3') == WriteStrategy.APPEND)
140140

141141
def test_get_document_id():
@@ -184,6 +184,28 @@ def test_check():
184184
assert result == None
185185

186186

187+
def test_pre_sync_table_does_exist():
188+
indexer = _create_snowflake_cortex_indexer(generate_catalog())
189+
mock_processor = MagicMock()
190+
indexer.default_processor = mock_processor
191+
192+
mock_processor._get_tables_list.return_value = ["table1", "table2"]
193+
mock_processor._execute_query.return_value = None
194+
indexer.pre_sync(generate_catalog())
195+
mock_processor._get_tables_list.assert_called_once()
196+
mock_processor._execute_sql.assert_not_called()
197+
198+
def test_pre_sync_table_exists():
199+
indexer = _create_snowflake_cortex_indexer(generate_catalog())
200+
mock_processor = MagicMock()
201+
indexer.default_processor = mock_processor
202+
203+
mock_processor._get_tables_list.return_value = ["example_stream2", "table2"]
204+
mock_processor._execute_query.return_value = None
205+
indexer.pre_sync(generate_catalog())
206+
mock_processor._get_tables_list.assert_called_once()
207+
mock_processor._execute_sql.assert_called_once()
208+
187209
def generate_catalog():
188210
return ConfiguredAirbyteCatalog.parse_obj(
189211
{

docs/integrations/destinations/snowflake-cortex.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Overview
44

5-
This page guides you through the process of setting up the [Snowflake](https://pinecone.io/) as a vector destination.
5+
This page guides you through the process of setting up the [Snowflake](https://www.snowflake.com/en/) as a vector destination.
66

77
There are three parts to this:
88
* Processing - split up individual records in chunks so they will fit the context window and decide which fields to use as context and which are supplementary metadata.
@@ -81,4 +81,5 @@ To get started, sign up for [Snowflake](https://www.snowflake.com/en/). Ensure y
8181

8282
| Version | Date | Pull Request | Subject |
8383
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
84+
| 0.1.1 | 2024-05-15 | [#38206](https://github.com/airbytehq/airbyte/pull/38206) | Bug fixes.
8485
| 0.1.0 | 2024-05-13 | [#37333](https://github.com/airbytehq/airbyte/pull/36807) | Add support for Snowflake as a Vector destination.

0 commit comments

Comments
 (0)