Skip to content

Commit e86c263

Browse files
tswastemar-kar
authored andcommitted
Optionally include indexes in table written by load_table_from_dataframe. (googleapis#9084)
* Specify the index data type in partial schema to `load_table_from_dataframe` to include it. If an index (or level of a multi-index) has a name and is present in the schema passed to `load_table_from_dataframe`, then that index will be serialized and written to the table. Otherwise, the index is omitted from the serialized table. * Don't include index if has same name as column name. * Move `load_table_dataframe` sample from `snippets.py` to `samples/`. Sample now demonstrates how to manually include the index with a partial schema definition. Update docs reference to new `load_table_dataframe` sample location.
1 parent b9c0892 commit e86c263

File tree

7 files changed

+421
-57
lines changed

7 files changed

+421
-57
lines changed

bigquery/docs/snippets.py

-47
Original file line numberDiff line numberDiff line change
@@ -2512,52 +2512,5 @@ def test_list_rows_as_dataframe(client):
25122512
assert len(df) == table.num_rows # verify the number of rows
25132513

25142514

2515-
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
2516-
@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"])
2517-
def test_load_table_from_dataframe(client, to_delete, parquet_engine):
2518-
if parquet_engine == "pyarrow" and pyarrow is None:
2519-
pytest.skip("Requires `pyarrow`")
2520-
if parquet_engine == "fastparquet" and fastparquet is None:
2521-
pytest.skip("Requires `fastparquet`")
2522-
2523-
pandas.set_option("io.parquet.engine", parquet_engine)
2524-
2525-
dataset_id = "load_table_from_dataframe_{}".format(_millis())
2526-
dataset = bigquery.Dataset(client.dataset(dataset_id))
2527-
client.create_dataset(dataset)
2528-
to_delete.append(dataset)
2529-
2530-
# [START bigquery_load_table_dataframe]
2531-
# from google.cloud import bigquery
2532-
# import pandas
2533-
# client = bigquery.Client()
2534-
# dataset_id = 'my_dataset'
2535-
2536-
dataset_ref = client.dataset(dataset_id)
2537-
table_ref = dataset_ref.table("monty_python")
2538-
records = [
2539-
{"title": u"The Meaning of Life", "release_year": 1983},
2540-
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
2541-
{"title": u"Life of Brian", "release_year": 1979},
2542-
{"title": u"And Now for Something Completely Different", "release_year": 1971},
2543-
]
2544-
# Optionally set explicit indices.
2545-
# If indices are not specified, a column will be created for the default
2546-
# indices created by pandas.
2547-
index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"]
2548-
dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id"))
2549-
2550-
job = client.load_table_from_dataframe(dataframe, table_ref, location="US")
2551-
2552-
job.result() # Waits for table load to complete.
2553-
2554-
assert job.state == "DONE"
2555-
table = client.get_table(table_ref)
2556-
assert table.num_rows == 4
2557-
# [END bigquery_load_table_dataframe]
2558-
column_names = [field.name for field in table.schema]
2559-
assert sorted(column_names) == ["release_year", "title", "wikidata_id"]
2560-
2561-
25622515
if __name__ == "__main__":
25632516
pytest.main()

bigquery/docs/usage/pandas.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ install the BigQuery python client library with :mod:`pandas` and
5555
The following example demonstrates how to create a :class:`pandas.DataFrame`
5656
and load it into a new table:
5757

58-
.. literalinclude:: ../snippets.py
58+
.. literalinclude:: ../samples/load_table_dataframe.py
5959
:language: python
6060
:dedent: 4
6161
:start-after: [START bigquery_load_table_dataframe]

bigquery/google/cloud/bigquery/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from google.cloud.bigquery.dataset import AccessEntry
3737
from google.cloud.bigquery.dataset import Dataset
3838
from google.cloud.bigquery.dataset import DatasetReference
39+
from google.cloud.bigquery import enums
3940
from google.cloud.bigquery.enums import StandardSqlDataTypes
4041
from google.cloud.bigquery.external_config import ExternalConfig
4142
from google.cloud.bigquery.external_config import BigtableOptions
@@ -124,6 +125,7 @@
124125
"GoogleSheetsOptions",
125126
"DEFAULT_RETRY",
126127
# Enum Constants
128+
"enums",
127129
"Compression",
128130
"CreateDisposition",
129131
"DestinationFormat",

bigquery/google/cloud/bigquery/_pandas_helpers.py

+57-7
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,49 @@ def bq_to_arrow_array(series, bq_field):
187187
return pyarrow.array(series, type=arrow_type)
188188

189189

190+
def get_column_or_index(dataframe, name):
191+
"""Return a column or index as a pandas series."""
192+
if name in dataframe.columns:
193+
return dataframe[name].reset_index(drop=True)
194+
195+
if isinstance(dataframe.index, pandas.MultiIndex):
196+
if name in dataframe.index.names:
197+
return (
198+
dataframe.index.get_level_values(name)
199+
.to_series()
200+
.reset_index(drop=True)
201+
)
202+
else:
203+
if name == dataframe.index.name:
204+
return dataframe.index.to_series().reset_index(drop=True)
205+
206+
raise ValueError("column or index '{}' not found.".format(name))
207+
208+
209+
def list_columns_and_indexes(dataframe):
210+
"""Return all index and column names with dtypes.
211+
212+
Returns:
213+
Sequence[Tuple[dtype, str]]:
214+
Returns a sorted list of indexes and column names with
215+
corresponding dtypes. If an index is missing a name or has the
216+
same name as a column, the index is omitted.
217+
"""
218+
column_names = frozenset(dataframe.columns)
219+
columns_and_indexes = []
220+
if isinstance(dataframe.index, pandas.MultiIndex):
221+
for name in dataframe.index.names:
222+
if name and name not in column_names:
223+
values = dataframe.index.get_level_values(name)
224+
columns_and_indexes.append((name, values.dtype))
225+
else:
226+
if dataframe.index.name and dataframe.index.name not in column_names:
227+
columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))
228+
229+
columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
230+
return columns_and_indexes
231+
232+
190233
def dataframe_to_bq_schema(dataframe, bq_schema):
191234
"""Convert a pandas DataFrame schema to a BigQuery schema.
192235
@@ -217,7 +260,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
217260
bq_schema_unused = set()
218261

219262
bq_schema_out = []
220-
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
263+
for column, dtype in list_columns_and_indexes(dataframe):
221264
# Use provided type from schema, if present.
222265
bq_field = bq_schema_index.get(column)
223266
if bq_field:
@@ -229,7 +272,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
229272
# pandas dtype.
230273
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
231274
if not bq_type:
232-
warnings.warn("Unable to determine type of column '{}'.".format(column))
275+
warnings.warn(u"Unable to determine type of column '{}'.".format(column))
233276
return None
234277
bq_field = schema.SchemaField(column, bq_type)
235278
bq_schema_out.append(bq_field)
@@ -238,7 +281,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
238281
# column, but it was not found.
239282
if bq_schema_unused:
240283
raise ValueError(
241-
"bq_schema contains fields not present in dataframe: {}".format(
284+
u"bq_schema contains fields not present in dataframe: {}".format(
242285
bq_schema_unused
243286
)
244287
)
@@ -261,20 +304,25 @@ def dataframe_to_arrow(dataframe, bq_schema):
261304
BigQuery schema.
262305
"""
263306
column_names = set(dataframe.columns)
307+
column_and_index_names = set(
308+
name for name, _ in list_columns_and_indexes(dataframe)
309+
)
264310
bq_field_names = set(field.name for field in bq_schema)
265311

266-
extra_fields = bq_field_names - column_names
312+
extra_fields = bq_field_names - column_and_index_names
267313
if extra_fields:
268314
raise ValueError(
269-
"bq_schema contains fields not present in dataframe: {}".format(
315+
u"bq_schema contains fields not present in dataframe: {}".format(
270316
extra_fields
271317
)
272318
)
273319

320+
# It's okay for indexes to be missing from bq_schema, but it's not okay to
321+
# be missing columns.
274322
missing_fields = column_names - bq_field_names
275323
if missing_fields:
276324
raise ValueError(
277-
"bq_schema is missing fields from dataframe: {}".format(missing_fields)
325+
u"bq_schema is missing fields from dataframe: {}".format(missing_fields)
278326
)
279327

280328
arrow_arrays = []
@@ -283,7 +331,9 @@ def dataframe_to_arrow(dataframe, bq_schema):
283331
for bq_field in bq_schema:
284332
arrow_fields.append(bq_to_arrow_field(bq_field))
285333
arrow_names.append(bq_field.name)
286-
arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field))
334+
arrow_arrays.append(
335+
bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
336+
)
287337

288338
if all((field is not None for field in arrow_fields)):
289339
return pyarrow.Table.from_arrays(
+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def load_table_dataframe(client, table_id):
17+
# [START bigquery_load_table_dataframe]
18+
from google.cloud import bigquery
19+
import pandas
20+
21+
# TODO(developer): Construct a BigQuery client object.
22+
# client = bigquery.Client()
23+
24+
# TODO(developer): Set table_id to the ID of the table to create.
25+
# table_id = "your-project.your_dataset.your_table_name"
26+
27+
records = [
28+
{"title": u"The Meaning of Life", "release_year": 1983},
29+
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
30+
{"title": u"Life of Brian", "release_year": 1979},
31+
{"title": u"And Now for Something Completely Different", "release_year": 1971},
32+
]
33+
dataframe = pandas.DataFrame(
34+
records,
35+
# In the loaded table, the column order reflects the order of the
36+
# columns in the DataFrame.
37+
columns=["title", "release_year"],
38+
# Optionally, set a named index, which can also be written to the
39+
# BigQuery table.
40+
index=pandas.Index(
41+
[u"Q24980", u"Q25043", u"Q24953", u"Q16403"], name="wikidata_id"
42+
),
43+
)
44+
job_config = bigquery.LoadJobConfig(
45+
# Specify a (partial) schema. All columns are always written to the
46+
# table. The schema is used to assist in data type definitions.
47+
schema=[
48+
# Specify the type of columns whose type cannot be auto-detected. For
49+
# example the "title" column uses pandas dtype "object", so its
50+
# data type is ambiguous.
51+
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
52+
# Indexes are written if included in the schema by name.
53+
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
54+
],
55+
# Optionally, set the write disposition. BigQuery appends loaded rows
56+
# to an existing table by default, but with WRITE_TRUNCATE write
57+
# disposition it replaces the table with the loaded data.
58+
write_disposition="WRITE_TRUNCATE",
59+
)
60+
61+
job = client.load_table_from_dataframe(
62+
dataframe, table_id, job_config=job_config, location="US"
63+
)
64+
job.result() # Waits for table load to complete.
65+
66+
table = client.get_table(table_id)
67+
print(
68+
"Loaded {} rows and {} columns to {}".format(
69+
table.num_rows, len(table.schema), table_id
70+
)
71+
)
72+
# [END bigquery_load_table_dataframe]
73+
return table
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
from .. import load_table_dataframe
18+
19+
20+
pytest.importorskip("pandas")
21+
pytest.importorskip("pyarrow")
22+
23+
24+
def test_load_table_dataframe(capsys, client, random_table_id):
25+
table = load_table_dataframe.load_table_dataframe(client, random_table_id)
26+
out, _ = capsys.readouterr()
27+
assert "Loaded 4 rows and 3 columns" in out
28+
29+
column_names = [field.name for field in table.schema]
30+
assert column_names == ["wikidata_id", "title", "release_year"]

0 commit comments

Comments
 (0)