Skip to content

Commit 4b84c63

Browse files
✨ New: Snowflake Cortex Destination 🚀 (#36807)
Co-authored-by: Aaron Steers <[email protected]>
1 parent 100f4e0 commit 4b84c63

File tree

22 files changed

+5552
-0
lines changed

22 files changed

+5552
-0
lines changed

.github/CODEOWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
/airbyte-integrations/connectors/destination-milvus @airbytehq/ai-language-models
55
/airbyte-integrations/connectors/destination-qdrant @airbytehq/ai-language-models
66
/airbyte-integrations/connectors/destination-chroma @airbytehq/ai-language-models
7+
/airbyte-integrations/connectors/destination-snowflake-cortex @airbytehq/ai-language-models
78
/airbyte-cdk/python/airbyte_cdk/destinations/vector_db_based @airbytehq/ai-language-models
89

910
# CI/CD
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*
2+
!Dockerfile
3+
!main.py
4+
!destination_snowflake_cortex
5+
!pyproject.toml
6+
!poetry.lock
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Snowflake Cortex Destination
2+
3+
This is the repository for the Snowflake Cortex destination connector, written in Python.
4+
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex).
5+
6+
## Local development
7+
8+
### Prerequisites
9+
**To iterate on this connector, make sure to complete this prerequisites section.**
10+
11+
#### Minimum Python version required `= 3.9.0`
12+
13+
### Installing the connector
14+
From this connector directory, run:
15+
```bash
16+
poetry install --with dev.
17+
```
18+
19+
#### Create credentials
20+
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex)
21+
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_snowflake_cortex/spec.json` file.
22+
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
23+
See `integration_tests/sample_config.json` for a sample config file.
24+
25+
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination snowflake-cortex test creds`
26+
and place them into `secrets/config.json`.
27+
28+
### Locally running the connector
29+
```
30+
poetry run python main.py spec
31+
poetry run python main.py check --config secrets/config.json
32+
cat examples/messages.jsonl | poetry run python main.py write --config secrets/config.json --catalog integration_tests/configured_catalog.json
33+
```
34+
35+
### Locally running the connector docker image
36+
37+
#### Use `airbyte-ci` to build your connector
38+
The Airbyte way of building this connector is to use our `airbyte-ci` tool.
39+
You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1).
40+
Then running the following command will build your connector:
41+
42+
```bash
43+
airbyte-ci connectors --name destination-snowflake-cortex build
44+
```
45+
Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-snowflake-cortex:dev`.
46+
47+
##### Customizing our build process
48+
When contributing on our connector you might need to customize the build process to add a system dependency or set an env var.
49+
You can customize our build process by adding a `build_customization.py` module to your connector.
50+
This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively.
51+
It will be imported at runtime by our build process and the functions will be called if they exist.
52+
53+
Here is an example of a `build_customization.py` module:
54+
```python
55+
from __future__ import annotations
56+
57+
from typing import TYPE_CHECKING
58+
59+
if TYPE_CHECKING:
60+
# Feel free to check the dagger documentation for more information on the Container object and its methods.
61+
# https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/
62+
from dagger import Container
63+
64+
65+
async def pre_connector_install(base_image_container: Container) -> Container:
66+
return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value")
67+
68+
async def post_connector_install(connector_container: Container) -> Container:
69+
return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value")
70+
```
71+
72+
#### Build your own connector image
73+
This connector is built using our dynamic built process in `airbyte-ci`.
74+
The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`.
75+
The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py).
76+
It does not rely on a Dockerfile.
77+
78+
If you would like to patch our connector and build your own a simple approach would be to:
79+
80+
1. Create your own Dockerfile based on the latest version of the connector image.
81+
```Dockerfile
82+
FROM airbyte/destination-snowflake-cortex:latest
83+
84+
COPY . ./airbyte/integration_code
85+
RUN pip install ./airbyte/integration_code
86+
87+
# The entrypoint and default env vars are already set in the base image
88+
# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
89+
# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
90+
```
91+
Please use this as an example. This is not optimized.
92+
93+
2. Build your image:
94+
```bash
95+
docker build -t airbyte/destination-snowflake-cortex:dev .
96+
# Running the spec command against your patched connector
97+
docker run airbyte/destination-snowflake-cortex:dev spec
98+
```
99+
#### Run
100+
Then run any of the connector commands as follows:
101+
```
102+
docker run --rm airbyte/destination-snowflake-cortex:dev spec
103+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-snowflake-cortex:dev check --config /secrets/config.json
104+
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
105+
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-snowflake-cortex:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
106+
```
107+
## Testing
108+
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
109+
```bash
110+
airbyte-ci connectors --name=destination-snowflake-cortex test
111+
```
112+
113+
### Unit Tests
114+
To run unit tests locally, from the connector directory run:
115+
```
116+
poetry run pytest -s unit_tests
117+
```
118+
119+
### Integration Tests
120+
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
121+
122+
To run integration tests locally, make sure you have a secrets/config.json as explained above, and then run:
123+
```
124+
poetry run pytest -s integration_tests
125+
```
126+
127+
### Customizing acceptance Tests
128+
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
129+
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
130+
131+
### Using `airbyte-ci` to run tests
132+
See [airbyte-ci documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command)
133+
134+
## Dependency Management
135+
All of your dependencies should go in `pyproject.toml`
136+
* required for your connector to work need to go to `[tool.poetry.dependencies]` list.
137+
* required for the testing need to go to `[tool.poetry.group.dev.dependencies]` list
138+
139+
### Publishing a new version of the connector
140+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
141+
1. Make sure your changes are passing unit and integration tests.
142+
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
143+
1. Create a Pull Request.
144+
1. Pat yourself on the back for being an awesome contributor.
145+
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
acceptance_tests:
2+
spec:
3+
tests:
4+
- spec_path: integration_tests/spec.json
5+
connector_image: airbyte/destination-snowflake-cortex:dev
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Snowflake Cortex Destination Connector Bootstrap
2+
3+
This destination does three things:
4+
* Split records into chunks and separates metadata from text data
5+
* Embeds text data into an embedding vector
6+
* Stores the metadata and embedding vector in Snowflake
7+
8+
The record processing is using the text split components from https://python.langchain.com/docs/modules/data_connection/document_transformers/.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from .destination import DestinationSnowflakeCortex
7+
8+
__all__ = ["DestinationSnowflakeCortex"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from typing import Literal, Union
7+
8+
from airbyte_cdk.destinations.vector_db_based.config import VectorDBConfigModel
9+
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
10+
from pydantic import BaseModel, Field
11+
12+
13+
class PasswordBasedAuthorizationModel(BaseModel):
14+
password: str = Field(
15+
...,
16+
title="Password",
17+
airbyte_secret=True,
18+
description="Enter the password you want to use to access the database",
19+
examples=["AIRBYTE_PASSWORD"],
20+
)
21+
22+
class Config:
23+
title = "Credentials"
24+
25+
26+
# to-do - https://github.com/airbytehq/airbyte/issues/38007 - add Snowflake supported models to embedding options
27+
class SnowflakeCortexIndexingModel(BaseModel):
28+
host: str = Field(
29+
...,
30+
title="Host",
31+
airbyte_secret=True,
32+
description="Enter the account name you want to use to access the database. This is usually the identifier before .snowflakecomputing.com",
33+
examples=["AIRBYTE_ACCOUNT"],
34+
)
35+
role: str = Field(
36+
...,
37+
title="Role",
38+
airbyte_secret=True,
39+
description="Enter the role that you want to use to access Snowflake",
40+
examples=["AIRBYTE_ROLE", "ACCOUNTADMIN"],
41+
)
42+
warehouse: str = Field(
43+
...,
44+
title="Warehouse",
45+
airbyte_secret=True,
46+
description="Enter the name of the warehouse that you want to sync data into",
47+
examples=["AIRBYTE_WAREHOUSE"],
48+
)
49+
database: str = Field(
50+
...,
51+
title="Database",
52+
airbyte_secret=True,
53+
description="Enter the name of the database that you want to sync data into",
54+
examples=["AIRBYTE_DATABASE"],
55+
)
56+
default_schema: str = Field(
57+
...,
58+
title="Default Schema",
59+
airbyte_secret=True,
60+
description="Enter the name of the default schema",
61+
examples=["AIRBYTE_SCHEMA"],
62+
)
63+
username: str = Field(
64+
...,
65+
title="Username",
66+
airbyte_secret=True,
67+
description="Enter the name of the user you want to use to access the database",
68+
examples=["AIRBYTE_USER"],
69+
)
70+
71+
credentials: PasswordBasedAuthorizationModel
72+
73+
class Config:
74+
title = "Indexing"
75+
schema_extra = {
76+
"description": "Snowflake can be used to store vector data and retrieve embeddings.",
77+
"group": "indexing",
78+
}
79+
80+
81+
class ConfigModel(VectorDBConfigModel):
82+
indexing: SnowflakeCortexIndexingModel
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import os
7+
from typing import Any, Iterable, Mapping, Optional
8+
9+
from airbyte_cdk import AirbyteLogger
10+
from airbyte_cdk.destinations import Destination
11+
from airbyte_cdk.destinations.vector_db_based.embedder import Embedder, create_from_config
12+
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
13+
from airbyte_cdk.destinations.vector_db_based.writer import Writer
14+
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status
15+
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
16+
from destination_snowflake_cortex.config import ConfigModel
17+
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer
18+
19+
BATCH_SIZE = 32
20+
21+
22+
class DestinationSnowflakeCortex(Destination):
23+
indexer: Indexer
24+
embedder: Embedder
25+
26+
def _init_indexer(self, config: ConfigModel, configured_catalog: Optional[ConfiguredAirbyteCatalog] = None):
27+
self.embedder = create_from_config(config.embedding, config.processing)
28+
self.indexer = SnowflakeCortexIndexer(config.indexing, self.embedder.embedding_dimensions, configured_catalog)
29+
30+
def write(
31+
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
32+
) -> Iterable[AirbyteMessage]:
33+
parsed_config = ConfigModel.parse_obj(config)
34+
self._init_indexer(parsed_config, configured_catalog)
35+
writer = Writer(
36+
parsed_config.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=parsed_config.omit_raw_text
37+
)
38+
yield from writer.write(configured_catalog, input_messages)
39+
40+
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
41+
try:
42+
parsed_config = ConfigModel.parse_obj(config)
43+
self._init_indexer(parsed_config)
44+
self.indexer.check()
45+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
46+
except Exception as e:
47+
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
48+
49+
def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
50+
return ConnectorSpecification(
51+
documentationUrl="https://docs.airbyte.com/integrations/destinations/snowflake-cortex",
52+
supportsIncremental=True,
53+
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append, DestinationSyncMode.append_dedup],
54+
connectionSpecification=ConfigModel.schema(), # type: ignore[attr-defined]
55+
)

0 commit comments

Comments
 (0)