Skip to content

Commit 0c145d9

Browse files
Bindi/support for pinecone serverless (#37756)
Co-authored-by: Roie Schwaber-Cohen <@rschwabco>
1 parent ddc3d2b commit 0c145d9

File tree

8 files changed

+247
-151
lines changed

8 files changed

+247
-151
lines changed

airbyte-integrations/connectors/destination-pinecone/destination_pinecone/destination.py

+32-16
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,44 @@ class DestinationPinecone(Destination):
2424
embedder: Embedder
2525

2626
def _init_indexer(self, config: ConfigModel):
27-
self.embedder = create_from_config(config.embedding, config.processing)
28-
self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions)
27+
try:
28+
self.embedder = create_from_config(config.embedding, config.processing)
29+
self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions)
30+
except Exception as e:
31+
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
2932

3033
def write(
3134
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
3235
) -> Iterable[AirbyteMessage]:
33-
config_model = ConfigModel.parse_obj(config)
34-
self._init_indexer(config_model)
35-
writer = Writer(
36-
config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text
37-
)
38-
yield from writer.write(configured_catalog, input_messages)
36+
try:
37+
config_model = ConfigModel.parse_obj(config)
38+
self._init_indexer(config_model)
39+
writer = Writer(
40+
config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text
41+
)
42+
yield from writer.write(configured_catalog, input_messages)
43+
except Exception as e:
44+
yield AirbyteMessage(type="LOG", log=AirbyteLogger(level="ERROR", message=str(e)))
3945

4046
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
41-
parsed_config = ConfigModel.parse_obj(config)
42-
self._init_indexer(parsed_config)
43-
checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)]
44-
errors = [error for error in checks if error is not None]
45-
if len(errors) > 0:
46-
return AirbyteConnectionStatus(status=Status.FAILED, message="\n".join(errors))
47-
else:
48-
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
47+
try:
48+
parsed_config = ConfigModel.parse_obj(config)
49+
init_status = self._init_indexer(parsed_config)
50+
if init_status and init_status.status == Status.FAILED:
51+
logger.error(f"Initialization failed with message: {init_status.message}")
52+
return init_status # Return the failure status immediately if initialization fails
53+
54+
checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)]
55+
errors = [error for error in checks if error is not None]
56+
if len(errors) > 0:
57+
error_message = "\n".join(errors)
58+
logger.error(f"Configuration check failed: {error_message}")
59+
return AirbyteConnectionStatus(status=Status.FAILED, message=error_message)
60+
else:
61+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
62+
except Exception as e:
63+
logger.error(f"Exception during configuration check: {str(e)}")
64+
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
4965

5066
def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
5167
return ConnectorSpecification(

airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py

+60-18
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
import uuid
66
from typing import Optional
77

8-
import pinecone
98
import urllib3
109
from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD
1110
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
1211
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception
12+
from airbyte_cdk.models import AirbyteConnectionStatus, Status
1313
from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode
1414
from destination_pinecone.config import PineconeIndexingModel
15+
from pinecone import PineconeException
16+
from pinecone.grpc import PineconeGRPC
1517

1618
# large enough to speed up processing, small enough to not hit pinecone request limits
1719
PINECONE_BATCH_SIZE = 40
@@ -29,32 +31,54 @@ class PineconeIndexer(Indexer):
2931

3032
def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int):
3133
super().__init__(config)
32-
pinecone.init(api_key=config.pinecone_key, environment=config.pinecone_environment, threaded=True)
34+
try:
35+
self.pc = PineconeGRPC(api_key=config.pinecone_key, threaded=True)
36+
except PineconeException as e:
37+
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
3338

34-
self.pinecone_index = pinecone.GRPCIndex(config.index)
39+
self.pinecone_index = self.pc.Index(config.index)
3540
self.embedding_dimensions = embedding_dimensions
3641

42+
def determine_spec_type(self, index_name):
43+
description = self.pc.describe_index(index_name)
44+
spec_keys = description.get("spec", {})
45+
if "pod" in spec_keys:
46+
return "pod"
47+
elif "serverless" in spec_keys:
48+
return "serverless"
49+
else:
50+
raise ValueError("Unknown index specification type.")
51+
3752
def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
38-
index_description = pinecone.describe_index(self.config.index)
39-
self._pod_type = index_description.pod_type
53+
self._pod_type = self.determine_spec_type(self.config.index)
54+
4055
for stream in catalog.streams:
56+
stream_identifier = create_stream_identifier(stream.stream)
4157
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
4258
self.delete_vectors(
43-
filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace
59+
filter={METADATA_STREAM_FIELD: stream_identifier}, namespace=stream.stream.namespace, prefix=stream_identifier
4460
)
4561

4662
def post_sync(self):
4763
return []
4864

49-
def delete_vectors(self, filter, namespace=None):
65+
def delete_vectors(self, filter, namespace=None, prefix=None):
5066
if self._pod_type == "starter":
5167
# Starter pod types have a maximum of 100000 rows
5268
top_k = 10000
5369
self.delete_by_metadata(filter, top_k, namespace)
70+
elif self._pod_type == "serverless":
71+
if prefix == None:
72+
raise ValueError("Prefix is required for a serverless index.")
73+
self.delete_by_prefix(prefix=prefix, namespace=namespace)
5474
else:
75+
# Pod spec
5576
self.pinecone_index.delete(filter=filter, namespace=namespace)
5677

5778
def delete_by_metadata(self, filter, top_k, namespace=None):
79+
"""
80+
Applicable to Starter implementation only. Deletes all vectors that match the given metadata filter.
81+
"""
5882
zero_vector = [0.0] * self.embedding_dimensions
5983
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)
6084
while len(query_result.matches) > 0:
@@ -66,6 +90,13 @@ def delete_by_metadata(self, filter, top_k, namespace=None):
6690
self.pinecone_index.delete(ids=list(batch), namespace=namespace)
6791
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)
6892

93+
def delete_by_prefix(self, prefix, namespace=None):
94+
"""
95+
Applicable to Serverless implementation only. Deletes all vectors with the given prefix.
96+
"""
97+
for ids in self.pinecone_index.list(prefix=prefix, namespace=namespace):
98+
self.pinecone_index.delete(ids=ids, namespace=namespace)
99+
69100
def _truncate_metadata(self, metadata: dict) -> dict:
70101
"""
71102
Normalize metadata to ensure it is within the size limit and doesn't contain complex objects.
@@ -85,34 +116,45 @@ def _truncate_metadata(self, metadata: dict) -> dict:
85116

86117
return result
87118

88-
def index(self, document_chunks, namespace, stream):
119+
def index(self, document_chunks, namespace, streamName):
89120
pinecone_docs = []
90121
for i in range(len(document_chunks)):
91122
chunk = document_chunks[i]
92123
metadata = self._truncate_metadata(chunk.metadata)
93124
if chunk.page_content is not None:
94125
metadata["text"] = chunk.page_content
95-
pinecone_docs.append((str(uuid.uuid4()), chunk.embedding, metadata))
126+
prefix = streamName
127+
pinecone_docs.append((prefix + "#" + str(uuid.uuid4()), chunk.embedding, metadata))
96128
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
97129
for batch in serial_batches:
98-
async_results = [
99-
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace)
100-
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
101-
]
130+
async_results = []
131+
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE):
132+
async_result = self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
133+
async_results.append(async_result)
102134
# Wait for and retrieve responses (this raises in case of error)
103135
[async_result.result() for async_result in async_results]
104136

105137
def delete(self, delete_ids, namespace, stream):
138+
filter = {METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}
106139
if len(delete_ids) > 0:
107-
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace)
140+
if self._pod_type == "starter":
141+
# Starter pod types have a maximum of 100000 rows
142+
top_k = 10000
143+
self.delete_by_metadata(filter=filter, top_k=top_k, namespace=namespace)
144+
elif self._pod_type == "serverless":
145+
self.pinecone_index.delete(ids=delete_ids, namespace=namespace)
146+
else:
147+
# Pod spec
148+
self.pinecone_index.delete(filter=filter, namespace=namespace)
108149

109150
def check(self) -> Optional[str]:
110151
try:
111-
indexes = pinecone.list_indexes()
112-
if self.config.index not in indexes:
152+
list = self.pc.list_indexes()
153+
index_names = [index["name"] for index in list.indexes]
154+
if self.config.index not in index_names:
113155
return f"Index {self.config.index} does not exist in environment {self.config.pinecone_environment}."
114156

115-
description = pinecone.describe_index(self.config.index)
157+
description = self.pc.describe_index(self.config.index)
116158
actual_dimension = int(description.dimension)
117159
if actual_dimension != self.embedding_dimensions:
118160
return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match."
@@ -121,7 +163,7 @@ def check(self) -> Optional[str]:
121163
if f"Failed to resolve 'controller.{self.config.pinecone_environment}.pinecone.io'" in str(e.reason):
122164
return f"Failed to resolve environment, please check whether {self.config.pinecone_environment} is correct."
123165

124-
if isinstance(e, pinecone.exceptions.UnauthorizedException):
166+
if isinstance(e, PineconeException):
125167
if e.body:
126168
return e.body
127169

airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py

+37-9
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,53 @@
44

55
import json
66
import logging
7+
import time
78

8-
import pinecone
99
from airbyte_cdk.destinations.vector_db_based.embedder import OPEN_AI_VECTOR_SIZE
1010
from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest
1111
from airbyte_cdk.models import DestinationSyncMode, Status
1212
from destination_pinecone.destination import DestinationPinecone
1313
from langchain.embeddings import OpenAIEmbeddings
1414
from langchain.vectorstores import Pinecone
15+
from pinecone import Pinecone as PineconeREST
16+
from pinecone import PineconeException
17+
from pinecone.grpc import PineconeGRPC
1518

1619

1720
class PineconeIntegrationTest(BaseIntegrationTest):
1821
def _init_pinecone(self):
19-
pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"])
20-
self.pinecone_index = pinecone.Index(self.config["indexing"]["index"])
21-
22+
self.pc = PineconeGRPC(api_key=self.config["indexing"]["pinecone_key"])
23+
self.pinecone_index = self.pc.Index(self.config["indexing"]["index"])
24+
self.pc_rest = PineconeREST(api_key=self.config["indexing"]["pinecone_key"])
25+
self.pinecone_index_rest = self.pc_rest.Index(name=self.config["indexing"]["index"])
26+
27+
def _wait(self):
28+
print("Waiting for Pinecone...", end='', flush=True)
29+
for i in range(15):
30+
time.sleep(1)
31+
print(".", end='', flush=True)
32+
print() # Move to the next line after the loop
33+
2234
def setUp(self):
2335
with open("secrets/config.json", "r") as f:
2436
self.config = json.loads(f.read())
2537
self._init_pinecone()
2638

2739
def tearDown(self):
28-
# make sure pinecone is initialized correctly before cleaning up
40+
self._wait()
41+
# make sure pinecone is initialized correctly before cleaning up
2942
self._init_pinecone()
30-
self.pinecone_index.delete(delete_all=True)
43+
try:
44+
self.pinecone_index.delete(delete_all=True)
45+
except PineconeException as e:
46+
if "Namespace not found" not in str(e):
47+
raise(e)
48+
else :
49+
print("Noting to delete. No data in the index/namespace.")
50+
3151

3252
def test_check_valid_config(self):
33-
outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config)
53+
outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config)
3454
assert outcome.status == Status.SUCCEEDED
3555

3656
def test_check_invalid_config(self):
@@ -43,10 +63,11 @@ def test_check_invalid_config(self):
4363
"mode": "pinecone",
4464
"pinecone_key": "mykey",
4565
"index": "testdata",
46-
"pinecone_environment": "asia-southeast1-gcp-free",
66+
"pinecone_environment": "us-west1-gcp",
4767
},
4868
},
4969
)
70+
5071
assert outcome.status == Status.FAILED
5172

5273
def test_write(self):
@@ -57,14 +78,21 @@ def test_write(self):
5778
# initial sync
5879
destination = DestinationPinecone()
5980
list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message]))
81+
82+
83+
self._wait()
6084
assert self.pinecone_index.describe_index_stats().total_vector_count == 5
6185

6286
# incrementalally update a doc
6387
incremental_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup)
6488
list(destination.write(self.config, incremental_catalog, [self._record("mystream", "Cats are nice", 2), first_state_message]))
89+
90+
self._wait()
91+
6592
result = self.pinecone_index.query(
6693
vector=[0] * OPEN_AI_VECTOR_SIZE, top_k=10, filter={"_ab_record_id": "mystream_2"}, include_metadata=True
6794
)
95+
6896
assert len(result.matches) == 1
6997
assert (
7098
result.matches[0].metadata["text"] == "str_col: Cats are nice"
@@ -73,6 +101,6 @@ def test_write(self):
73101
# test langchain integration
74102
embeddings = OpenAIEmbeddings(openai_api_key=self.config["embedding"]["openai_key"])
75103
self._init_pinecone()
76-
vector_store = Pinecone(self.pinecone_index, embeddings.embed_query, "text")
104+
vector_store = Pinecone(self.pinecone_index_rest, embeddings.embed_query, "text")
77105
result = vector_store.similarity_search("feline animals", 1)
78106
assert result[0].metadata["_ab_record_id"] == "mystream_2"

airbyte-integrations/connectors/destination-pinecone/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ data:
1313
connectorSubtype: vectorstore
1414
connectorType: destination
1515
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
16-
dockerImageTag: 0.0.24
16+
dockerImageTag: 0.1.0
1717
dockerRepository: airbyte/destination-pinecone
1818
documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone
1919
githubIssueLabel: destination-pinecone

0 commit comments

Comments
 (0)