Skip to content

Commit 352e8e4

Browse files
authored
fix: read_csv supports for tilde local paths and includes index for bigquery_stream write engine (#1580)
1 parent 45c9d9f commit 352e8e4

File tree

2 files changed

+70
-91
lines changed

2 files changed

+70
-91
lines changed

bigframes/session/loader.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,12 @@ def read_pandas_streaming(
221221
[ordering_col],
222222
)
223223
destination_table = bigquery.Table(destination, schema=schema)
224-
# TODO(swast): Confirm that the index is written.
224+
225+
# `insert_rows_from_dataframe` does not include the DataFrame's index,
226+
# so reset_index() is called first.
225227
for errors in self._bqclient.insert_rows_from_dataframe(
226228
destination_table,
227-
pandas_dataframe_copy,
229+
pandas_dataframe_copy.reset_index(),
228230
):
229231
if errors:
230232
raise ValueError(
@@ -509,6 +511,7 @@ def _read_bigquery_load_job(
509511
job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS]
510512

511513
if isinstance(filepath_or_buffer, str):
514+
filepath_or_buffer = os.path.expanduser(filepath_or_buffer)
512515
if filepath_or_buffer.startswith("gs://"):
513516
load_job = self._bqclient.load_table_from_uri(
514517
filepath_or_buffer, table, job_config=job_config

tests/system/small/test_session.py

+65-89
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,6 @@ def test_read_pandas_timedelta_dataframes(session, write_engine):
851851
.astype("timedelta64[ns]")
852852
)
853853

854-
if write_engine == "bigquery_streaming":
855-
expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64")
856854
pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False)
857855

858856

@@ -869,16 +867,14 @@ def test_read_pandas_timedelta_series(session, write_engine):
869867
.astype("timedelta64[ns]")
870868
)
871869

872-
if write_engine == "bigquery_streaming":
873-
expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64")
874870
pd.testing.assert_series_equal(
875871
actual_result, expected_series, check_index_type=False
876872
)
877873

878874

879875
@pytest.mark.parametrize(
880876
"write_engine",
881-
["default", "bigquery_inline", "bigquery_load"],
877+
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
882878
)
883879
def test_read_pandas_timedelta_index(session, write_engine):
884880
expected_index = pd.to_timedelta(
@@ -918,14 +914,17 @@ def test_read_pandas_json_dataframes(session, write_engine):
918914
expected_df, write_engine=write_engine
919915
).to_pandas()
920916

921-
if write_engine == "bigquery_streaming":
922-
expected_df.index = pd.Index([pd.NA] * 4, dtype="Int64")
923917
pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False)
924918

925919

926920
@pytest.mark.parametrize(
927-
"write_engine",
928-
["default", "bigquery_load"],
921+
("write_engine"),
922+
[
923+
pytest.param("default"),
924+
pytest.param("bigquery_load"),
925+
pytest.param("bigquery_streaming"),
926+
pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)),
927+
],
929928
)
930929
def test_read_pandas_json_series(session, write_engine):
931930
json_data = [
@@ -949,6 +948,8 @@ def test_read_pandas_json_series(session, write_engine):
949948
[
950949
pytest.param("default"),
951950
pytest.param("bigquery_load"),
951+
pytest.param("bigquery_streaming"),
952+
pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)),
952953
],
953954
)
954955
def test_read_pandas_json_index(session, write_engine):
@@ -970,6 +971,8 @@ def test_read_pandas_json_index(session, write_engine):
970971
[
971972
pytest.param("default"),
972973
pytest.param("bigquery_load"),
974+
pytest.param("bigquery_streaming"),
975+
pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)),
973976
],
974977
)
975978
def test_read_pandas_w_nested_json(session, write_engine):
@@ -997,6 +1000,8 @@ def test_read_pandas_w_nested_json(session, write_engine):
9971000
[
9981001
pytest.param("default"),
9991002
pytest.param("bigquery_load"),
1003+
pytest.param("bigquery_streaming"),
1004+
pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)),
10001005
],
10011006
)
10021007
def test_read_pandas_w_nested_json_index(session, write_engine):
@@ -1031,52 +1036,43 @@ def test_read_pandas_w_nested_json_index(session, write_engine):
10311036
("bigquery_streaming",),
10321037
),
10331038
)
1034-
def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder, write_engine):
1035-
scalars_df, _ = scalars_dfs
1039+
def test_read_csv_for_gcs_file_w_default_engine(
1040+
session, scalars_dfs, gcs_folder, write_engine
1041+
):
1042+
scalars_df, scalars_pandas_df = scalars_dfs
10361043
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index*.csv"
10371044
read_path = utils.get_first_file_from_wildcard(path)
1038-
scalars_df.to_csv(path, index=False)
1045+
scalars_df.to_csv(path, index=True)
10391046
dtype = scalars_df.dtypes.to_dict()
10401047
dtype.pop("geography_col")
1041-
df = session.read_csv(
1048+
result_df = session.read_csv(
10421049
read_path,
10431050
# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
10441051
dtype=dtype,
10451052
write_engine=write_engine,
1053+
index_col="rowindex",
10461054
)
10471055

1048-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1049-
pd.testing.assert_index_equal(df.columns, scalars_df.columns)
1050-
10511056
# The auto detects of BigQuery load job have restrictions to detect the bytes,
1052-
# numeric and geometry types, so they're skipped here.
1053-
df = df.drop(columns=["bytes_col", "numeric_col", "geography_col"])
1054-
scalars_df = scalars_df.drop(columns=["bytes_col", "numeric_col", "geography_col"])
1055-
assert df.shape[0] == scalars_df.shape[0]
1056-
pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes)
1057+
# datetime, numeric and geometry types, so they're skipped here.
1058+
drop_columns = ["bytes_col", "numeric_col", "geography_col"]
1059+
result_df = result_df.drop(columns=drop_columns)
1060+
scalars_pandas_df = scalars_pandas_df.drop(columns=drop_columns)
1061+
pd.testing.assert_frame_equal(result_df.to_pandas(), scalars_pandas_df)
10571062

10581063

1059-
def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder):
1060-
scalars_df, _ = scalars_dfs
1064+
def test_read_csv_for_gcs_file_w_bq_engine(session, scalars_dfs, gcs_folder):
1065+
scalars_df, scalars_pandas_df = scalars_dfs
10611066
path = gcs_folder + "test_read_csv_gcs_bq_engine_w_index*.csv"
1062-
scalars_df.to_csv(path, index=False)
1063-
df = session.read_csv(
1064-
path,
1065-
engine="bigquery",
1066-
index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64,
1067-
)
1068-
1069-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1070-
pd.testing.assert_index_equal(df.columns, scalars_df.columns)
1067+
scalars_df.to_csv(path, index=True)
1068+
result_df = session.read_csv(path, engine="bigquery", index_col="rowindex")
10711069

10721070
# The auto detects of BigQuery load job have restrictions to detect the bytes,
10731071
# datetime, numeric and geometry types, so they're skipped here.
1074-
df = df.drop(columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"])
1075-
scalars_df = scalars_df.drop(
1076-
columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1077-
)
1078-
assert df.shape[0] == scalars_df.shape[0]
1079-
pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes)
1072+
drop_columns = ["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1073+
result_df = result_df.drop(columns=drop_columns)
1074+
scalars_pandas_df = scalars_pandas_df.drop(columns=drop_columns)
1075+
pd.testing.assert_frame_equal(result_df.to_pandas(), scalars_pandas_df)
10801076

10811077

10821078
@pytest.mark.parametrize(
@@ -1091,28 +1087,23 @@ def test_read_csv_local_default_engine(session, scalars_dfs, sep):
10911087
scalars_df, scalars_pandas_df = scalars_dfs
10921088
with tempfile.TemporaryDirectory() as dir:
10931089
path = dir + "/test_read_csv_local_default_engine.csv"
1094-
# Using the pandas to_csv method because the BQ one does not support local write.
1095-
scalars_pandas_df.to_csv(path, index=False, sep=sep)
1090+
scalars_df.to_csv(path, index=True, sep=sep)
10961091
dtype = scalars_df.dtypes.to_dict()
10971092
dtype.pop("geography_col")
1098-
df = session.read_csv(
1093+
result_df = session.read_csv(
10991094
path,
11001095
sep=sep,
11011096
# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
11021097
dtype=dtype,
1098+
index_col="rowindex",
11031099
)
11041100

1105-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1106-
pd.testing.assert_index_equal(df.columns, scalars_df.columns)
1107-
11081101
# The auto detects of BigQuery load job have restrictions to detect the bytes,
1109-
# numeric and geometry types, so they're skipped here.
1110-
df = df.drop(columns=["bytes_col", "numeric_col", "geography_col"])
1111-
scalars_df = scalars_df.drop(
1112-
columns=["bytes_col", "numeric_col", "geography_col"]
1113-
)
1114-
assert df.shape[0] == scalars_df.shape[0]
1115-
pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes)
1102+
# datetime, numeric and geometry types, so they're skipped here.
1103+
drop_columns = ["bytes_col", "numeric_col", "geography_col"]
1104+
result_df = result_df.drop(columns=drop_columns)
1105+
scalars_pandas_df = scalars_pandas_df.drop(columns=drop_columns)
1106+
pd.testing.assert_frame_equal(result_df.to_pandas(), scalars_pandas_df)
11161107

11171108

11181109
@pytest.mark.parametrize(
@@ -1126,47 +1117,35 @@ def test_read_csv_local_bq_engine(session, scalars_dfs, sep):
11261117
scalars_df, scalars_pandas_df = scalars_dfs
11271118
with tempfile.TemporaryDirectory() as dir:
11281119
path = dir + "/test_read_csv_local_bq_engine.csv"
1129-
# Using the pandas to_csv method because the BQ one does not support local write.
1130-
scalars_pandas_df.to_csv(path, index=False, sep=sep)
1131-
df = session.read_csv(path, engine="bigquery", sep=sep)
1132-
1133-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1134-
pd.testing.assert_index_equal(df.columns, scalars_df.columns)
1120+
scalars_df.to_csv(path, index=True, sep=sep)
1121+
result_df = session.read_csv(
1122+
path, engine="bigquery", sep=sep, index_col="rowindex"
1123+
)
11351124

11361125
# The auto detects of BigQuery load job have restrictions to detect the bytes,
11371126
# datetime, numeric and geometry types, so they're skipped here.
1138-
df = df.drop(
1139-
columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1140-
)
1141-
scalars_df = scalars_df.drop(
1142-
columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1143-
)
1144-
assert df.shape[0] == scalars_df.shape[0]
1145-
pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes)
1127+
drop_columns = ["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1128+
result_df = result_df.drop(columns=drop_columns)
1129+
scalars_pandas_df = scalars_pandas_df.drop(columns=drop_columns)
1130+
pd.testing.assert_frame_equal(result_df.to_pandas(), scalars_pandas_df)
11461131

11471132

11481133
def test_read_csv_localbuffer_bq_engine(session, scalars_dfs):
11491134
scalars_df, scalars_pandas_df = scalars_dfs
11501135
with tempfile.TemporaryDirectory() as dir:
11511136
path = dir + "/test_read_csv_local_bq_engine.csv"
1152-
# Using the pandas to_csv method because the BQ one does not support local write.
1153-
scalars_pandas_df.to_csv(path, index=False)
1137+
scalars_df.to_csv(path, index=True)
11541138
with open(path, "rb") as buffer:
1155-
df = session.read_csv(buffer, engine="bigquery")
1156-
1157-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1158-
pd.testing.assert_index_equal(df.columns, scalars_df.columns)
1139+
result_df = session.read_csv(
1140+
buffer, engine="bigquery", index_col="rowindex"
1141+
)
11591142

11601143
# The auto detects of BigQuery load job have restrictions to detect the bytes,
11611144
# datetime, numeric and geometry types, so they're skipped here.
1162-
df = df.drop(
1163-
columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1164-
)
1165-
scalars_df = scalars_df.drop(
1166-
columns=["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1167-
)
1168-
assert df.shape[0] == scalars_df.shape[0]
1169-
pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes)
1145+
drop_columns = ["bytes_col", "datetime_col", "numeric_col", "geography_col"]
1146+
result_df = result_df.drop(columns=drop_columns)
1147+
scalars_pandas_df = scalars_pandas_df.drop(columns=drop_columns)
1148+
pd.testing.assert_frame_equal(result_df.to_pandas(), scalars_pandas_df)
11701149

11711150

11721151
def test_read_csv_bq_engine_supports_index_col_false(
@@ -1420,19 +1399,16 @@ def test_read_csv_local_w_encoding(session, penguins_pandas_df_default_index, en
14201399
with tempfile.TemporaryDirectory() as dir:
14211400
path = dir + "/test_read_csv_local_w_encoding.csv"
14221401
# Using the pandas to_csv method because the BQ one does not support local write.
1423-
penguins_pandas_df_default_index.to_csv(
1424-
path, index=False, encoding="ISO-8859-1"
1425-
)
1402+
penguins_pandas_df_default_index.index.name = "rowindex"
1403+
penguins_pandas_df_default_index.to_csv(path, index=True, encoding="ISO-8859-1")
14261404

14271405
# File can only be read using the same character encoding as when written.
1428-
df = session.read_csv(path, engine=engine, encoding="ISO-8859-1")
1429-
1430-
# TODO(chelsealin): If we serialize the index, can more easily compare values.
1431-
pd.testing.assert_index_equal(
1432-
df.columns, penguins_pandas_df_default_index.columns
1406+
result_df = session.read_csv(
1407+
path, engine=engine, encoding="ISO-8859-1", index_col="rowindex"
1408+
)
1409+
pd.testing.assert_frame_equal(
1410+
result_df.to_pandas(), penguins_pandas_df_default_index
14331411
)
1434-
1435-
assert df.shape[0] == penguins_pandas_df_default_index.shape[0]
14361412

14371413

14381414
def test_read_pickle_local(session, penguins_pandas_df_default_index, tmp_path):

0 commit comments

Comments
 (0)