9
9
10
10
from airbyte_cdk .destinations .vector_db_based .embedder import OPEN_AI_VECTOR_SIZE
11
11
from airbyte_cdk .destinations .vector_db_based .test_utils import BaseIntegrationTest
12
- from airbyte_cdk .models import DestinationSyncMode , Status
12
+ from airbyte_cdk .models import (
13
+ AirbyteMessage ,
14
+ AirbyteRecordMessage ,
15
+ AirbyteStateMessage ,
16
+ AirbyteStream ,
17
+ ConfiguredAirbyteCatalog ,
18
+ ConfiguredAirbyteStream ,
19
+ DestinationSyncMode ,
20
+ Status ,
21
+ SyncMode ,
22
+ Type ,
23
+ )
13
24
from destination_pinecone .destination import DestinationPinecone
14
25
from langchain .embeddings import OpenAIEmbeddings
15
26
from langchain .vectorstores import Pinecone
@@ -47,7 +58,14 @@ def tearDown(self):
47
58
if "Namespace not found" not in str (e ):
48
59
raise (e )
49
60
else :
50
- print ("Noting to delete. No data in the index/namespace." )
61
+ print ("Nothing to delete in default namespace. No data in the index/namespace." )
62
+ try :
63
+ self .pinecone_index .delete (delete_all = True , namespace = "ns1" )
64
+ except PineconeException as e :
65
+ if "Namespace not found" not in str (e ):
66
+ raise (e )
67
+ else :
68
+ print ("Nothing to delete in ns1 namespace. No data in the index/namespace." )
51
69
52
70
def test_integration_test_flag_is_set (self ):
53
71
assert "PYTEST_CURRENT_TEST" in os .environ
@@ -107,3 +125,44 @@ def test_write(self):
107
125
vector_store = Pinecone (self .pinecone_index_rest , embeddings .embed_query , "text" )
108
126
result = vector_store .similarity_search ("feline animals" , 1 )
109
127
assert result [0 ].metadata ["_ab_record_id" ] == "mystream_2"
128
+
129
+ def test_write_with_namespace (self ):
130
+ catalog = self ._get_configured_catalog_with_namespace (DestinationSyncMode .overwrite )
131
+ first_state_message = self ._state ({"state" : "1" })
132
+ first_record_chunk = [self ._record_with_namespace ("mystream" , f"Dogs are number { i } " , i ) for i in range (5 )]
133
+
134
+ # initial sync
135
+ destination = DestinationPinecone ()
136
+ list (destination .write (self .config , catalog , [* first_record_chunk , first_state_message ]))
137
+
138
+ self ._wait ()
139
+ assert self .pinecone_index .describe_index_stats ().total_vector_count == 5
140
+
141
+
142
+ def _get_configured_catalog_with_namespace (self , destination_mode : DestinationSyncMode ) -> ConfiguredAirbyteCatalog :
143
+ stream_schema = {"type" : "object" , "properties" : {"str_col" : {"type" : "str" }, "int_col" : {"type" : "integer" }, "random_col" : {"type" : "integer" }}}
144
+
145
+ overwrite_stream = ConfiguredAirbyteStream (
146
+ stream = AirbyteStream (
147
+ name = "mystream" ,
148
+ namespace = "ns1" ,
149
+ json_schema = stream_schema ,
150
+ supported_sync_modes = [SyncMode .incremental , SyncMode .full_refresh ]
151
+ ),
152
+ primary_key = [["int_col" ]],
153
+ sync_mode = SyncMode .incremental ,
154
+ destination_sync_mode = destination_mode ,
155
+ )
156
+
157
+ return ConfiguredAirbyteCatalog (streams = [overwrite_stream ])
158
+
159
+ def _record_with_namespace (self , stream : str , str_value : str , int_value : int ) -> AirbyteMessage :
160
+ return AirbyteMessage (
161
+ type = Type .RECORD , record = AirbyteRecordMessage (stream = stream ,
162
+ namespace = "ns1" ,
163
+ data = {"str_col" : str_value , "int_col" : int_value },
164
+ emitted_at = 0 )
165
+ )
166
+
167
+
168
+
0 commit comments