|
18 | 18 | import functools
|
19 | 19 | import io
|
20 | 20 | import os
|
| 21 | +import tempfile |
21 | 22 | import threading
|
22 | 23 | import time
|
23 |
| -from typing import Any, Dict, Optional, TYPE_CHECKING, Union, Callable |
| 24 | +from typing import Any, Dict, Optional, TYPE_CHECKING, Union, Callable, Literal |
24 | 25 |
|
25 | 26 | from google.cloud import bigquery
|
26 | 27 | from google.cloud import storage
|
@@ -250,25 +251,56 @@ def _read_gcs_file_contents(filepath: str) -> str:
|
250 | 251 | return blob.download_as_string().decode("utf-8")
|
251 | 252 |
|
252 | 253 |
|
| 254 | +def _upload_pandas_df_to_gcs( |
| 255 | + df: "pd.DataFrame", upload_gcs_path: str, file_type: Literal["csv", "jsonl"] |
| 256 | +) -> None: |
| 257 | + """Uploads the provided Pandas DataFrame to a GCS bucket. |
| 258 | +
|
| 259 | + Args: |
| 260 | + df: The Pandas DataFrame to upload. |
| 261 | + upload_gcs_path: The GCS path to upload the data file. |
| 262 | + file_type: The file type of the data file. |
| 263 | + """ |
| 264 | + |
| 265 | + with tempfile.TemporaryDirectory() as temp_dir: |
| 266 | + if file_type == "csv": |
| 267 | + local_dataset_path = os.path.join(temp_dir, "metrics_table.csv") |
| 268 | + df.to_csv(path_or_buf=local_dataset_path) |
| 269 | + elif file_type == "jsonl": |
| 270 | + local_dataset_path = os.path.join(temp_dir, "metrics_table.jsonl") |
| 271 | + df.to_json(path_or_buf=local_dataset_path, orient="records", lines=True) |
| 272 | + else: |
| 273 | + raise ValueError( |
| 274 | + f"Unsupported file type: {file_type} from {upload_gcs_path}." |
| 275 | + " Please provide a valid GCS path with `jsonl` or `csv` suffix." |
| 276 | + ) |
| 277 | + |
| 278 | + storage_client = storage.Client( |
| 279 | + project=initializer.global_config.project, |
| 280 | + credentials=initializer.global_config.credentials, |
| 281 | + ) |
| 282 | + storage.Blob.from_string( |
| 283 | + uri=upload_gcs_path, client=storage_client |
| 284 | + ).upload_from_filename(filename=local_dataset_path) |
| 285 | + |
| 286 | + |
253 | 287 | def upload_evaluation_results(
|
254 | 288 | dataset: "pd.DataFrame", destination_uri_prefix: str, file_name: str
|
255 |
| -): |
256 |
| - """Uploads eval results to GCS CSV destination.""" |
257 |
| - supported_file_types = ["csv"] |
| 289 | +) -> None: |
| 290 | + """Uploads eval results to GCS destination. |
| 291 | +
|
| 292 | + Args: |
| 293 | + dataset: Pandas dataframe to upload. |
| 294 | + destination_uri_prefix: GCS folder to store the data. |
| 295 | + file_name: File name to store the data. |
| 296 | + """ |
258 | 297 | if not destination_uri_prefix:
|
259 | 298 | return
|
260 | 299 | if destination_uri_prefix.startswith(_GCS_PREFIX):
|
261 | 300 | _, extension = os.path.splitext(file_name)
|
262 | 301 | file_type = extension.lower()[1:]
|
263 |
| - if file_type in supported_file_types: |
264 |
| - output_path = destination_uri_prefix + "/" + file_name |
265 |
| - utils.gcs_utils._upload_pandas_df_to_gcs(dataset, output_path) |
266 |
| - else: |
267 |
| - raise ValueError( |
268 |
| - "Unsupported file type in the GCS destination URI:" |
269 |
| - f" {file_name}, please provide a valid GCS" |
270 |
| - f" file name with a file type in {supported_file_types}." |
271 |
| - ) |
| 302 | + output_path = destination_uri_prefix + "/" + file_name |
| 303 | + _upload_pandas_df_to_gcs(dataset, output_path, file_type) |
272 | 304 | else:
|
273 | 305 | raise ValueError(
|
274 | 306 | f"Unsupported destination URI: {destination_uri_prefix}."
|
|
0 commit comments