Skip to content

Commit 07e471e

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: add support for partial failures sink in import rag files.
PiperOrigin-RevId: 675672254
1 parent 66d84af commit 07e471e

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

vertexai/preview/rag/rag_data.py

+20
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ def import_files(
293293
timeout: int = 600,
294294
max_embedding_requests_per_min: int = 1000,
295295
use_advanced_pdf_parsing: Optional[bool] = False,
296+
partial_failures_sink: Optional[str] = None,
296297
) -> ImportRagFilesResponse:
297298
"""
298299
Import files to an existing RagCorpus, wait until completion.
@@ -378,6 +379,14 @@ def import_files(
378379
timeout: Default is 600 seconds.
379380
use_advanced_pdf_parsing: Whether to use advanced PDF
380381
parsing on uploaded files.
382+
partial_failures_sink: Either a GCS path to store partial failures or a
383+
BigQuery table to store partial failures. The format is
384+
"gs://my-bucket/my/object.ndjson" for GCS or
385+
"bq://my-project.my-dataset.my-table" for BigQuery. An existing GCS
386+
object cannot be used. However, the BigQuery table may or may not
387+
exist - if it does not exist, it will be created. If it does exist,
388+
the schema will be checked and the partial failures will be appended
389+
to the table.
381390
Returns:
382391
ImportRagFilesResponse.
383392
"""
@@ -394,6 +403,7 @@ def import_files(
394403
chunk_overlap=chunk_overlap,
395404
max_embedding_requests_per_min=max_embedding_requests_per_min,
396405
use_advanced_pdf_parsing=use_advanced_pdf_parsing,
406+
partial_failures_sink=partial_failures_sink,
397407
)
398408
client = _gapic_utils.create_rag_data_service_client()
399409
try:
@@ -412,6 +422,7 @@ async def import_files_async(
412422
chunk_overlap: int = 200,
413423
max_embedding_requests_per_min: int = 1000,
414424
use_advanced_pdf_parsing: Optional[bool] = False,
425+
partial_failures_sink: Optional[str] = None,
415426
) -> operation_async.AsyncOperation:
416427
"""
417428
Import files to an existing RagCorpus asynchronously.
@@ -497,6 +508,14 @@ async def import_files_async(
497508
QPM would be used.
498509
use_advanced_pdf_parsing: Whether to use advanced PDF
499510
parsing on uploaded files.
511+
partial_failures_sink: Either a GCS path to store partial failures or a
512+
BigQuery table to store partial failures. The format is
513+
"gs://my-bucket/my/object.ndjson" for GCS or
514+
"bq://my-project.my-dataset.my-table" for BigQuery. An existing GCS
515+
object cannot be used. However, the BigQuery table may or may not
516+
exist - if it does not exist, it will be created. If it does exist,
517+
the schema will be checked and the partial failures will be appended
518+
to the table.
500519
Returns:
501520
operation_async.AsyncOperation.
502521
"""
@@ -513,6 +532,7 @@ async def import_files_async(
513532
chunk_overlap=chunk_overlap,
514533
max_embedding_requests_per_min=max_embedding_requests_per_min,
515534
use_advanced_pdf_parsing=use_advanced_pdf_parsing,
535+
partial_failures_sink=partial_failures_sink,
516536
)
517537
async_client = _gapic_utils.create_rag_data_service_async_client()
518538
try:

vertexai/preview/rag/utils/_gapic_utils.py

+17
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ def prepare_import_files_request(
247247
chunk_overlap: int = 200,
248248
max_embedding_requests_per_min: int = 1000,
249249
use_advanced_pdf_parsing: bool = False,
250+
partial_failures_sink: Optional[str] = None,
250251
) -> ImportRagFilesRequest:
251252
if len(corpus_name.split("/")) != 6:
252253
raise ValueError(
@@ -289,6 +290,22 @@ def prepare_import_files_request(
289290
)
290291
import_rag_files_config.google_drive_source = google_drive_source
291292

293+
if partial_failures_sink is not None:
294+
if partial_failures_sink.startswith("gs://"):
295+
import_rag_files_config.partial_failure_gcs_sink.output_uri_prefix = (
296+
partial_failures_sink
297+
)
298+
elif partial_failures_sink.startswith(
299+
"bq://"
300+
) or partial_failures_sink.startswith("bigquery://"):
301+
import_rag_files_config.partial_failure_bigquery_sink.output_uri = (
302+
partial_failures_sink
303+
)
304+
else:
305+
raise ValueError(
306+
"if provided, partial_failures_sink must be a GCS path or a BigQuery table."
307+
)
308+
292309
request = ImportRagFilesRequest(
293310
parent=corpus_name, import_rag_files_config=import_rag_files_config
294311
)

0 commit comments

Comments
 (0)