@@ -70,7 +70,8 @@ def __init__(self, storage_dir: str, vector_size: int, base_url: str, token: st
70
70
self .vector_size = vector_size
71
71
self .colpali_client = ColPaliClient (base_url , token )
72
72
73
- def ingest (self , case_name : str , dataset , user_id : str ):
73
+ def ingest (self , case_name : str , dataset , user_id : str , batch_size : int = 50 ):
74
+ """Ingest a dataset of images into LanceDB in batches."""
74
75
logger .info ("start ingest" )
75
76
start_time = time .time ()
76
77
@@ -85,27 +86,35 @@ def ingest(self, case_name: str, dataset, user_id: str):
85
86
lance_client = lancedb .connect (f"{ self .storage_dir } /{ user_id } /{ case_name } " )
86
87
tbl = lance_client .create_table (case_name , schema = schema )
87
88
88
- # TODO: ingest in batches
89
-
90
89
with tqdm (total = len (dataset ), desc = "Indexing Progress" ) as pbar :
90
+ batch = []
91
91
for i in range (len (dataset )):
92
92
image = dataset [i ]["image" ]
93
93
response = self .colpali_client .process_pil_image (image )
94
94
image_embedding = response ["embedding" ]
95
95
96
- data = {
97
- "index" : dataset [i ]["index" ],
98
- "pdf_name" : dataset [i ]["pdf_name" ],
99
- "pdf_page" : dataset [i ]["pdf_page" ],
100
- "vector" : image_embedding ,
101
- }
96
+ batch .append (
97
+ {
98
+ "index" : dataset [i ]["index" ],
99
+ "pdf_name" : dataset [i ]["pdf_name" ],
100
+ "pdf_page" : dataset [i ]["pdf_page" ],
101
+ "vector" : image_embedding ,
102
+ }
103
+ )
104
+
105
+ if len (batch ) >= batch_size :
106
+ try :
107
+ tbl .add (batch )
108
+ except Exception as e :
109
+ logger .error (f"Error during upsert: { e } " )
110
+ batch = []
111
+ pbar .update (1 )
102
112
113
+ if batch :
103
114
try :
104
- tbl .add ([ data ] )
115
+ tbl .add (batch )
105
116
except Exception as e :
106
117
logger .error (f"Error during upsert: { e } " )
107
- continue
108
- pbar .update (1 )
109
118
110
119
tbl .create_index (metric = "cosine" )
111
120
0 commit comments