|
25 | 25 | import pandas
|
26 | 26 | except ImportError: # pragma: NO COVER
|
27 | 27 | pandas = None
|
| 28 | +else: |
| 29 | + import numpy |
28 | 30 |
|
29 | 31 | import pyarrow
|
30 | 32 | import pyarrow.parquet
|
31 | 33 |
|
| 34 | +try: |
| 35 | + # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` |
| 36 | + from shapely.geometry.base import BaseGeometry as _BaseGeometry |
| 37 | +except ImportError: # pragma: NO COVER |
| 38 | + # No shapely, use NoneType for _BaseGeometry as a placeholder. |
| 39 | + _BaseGeometry = type(None) |
| 40 | +else: |
| 41 | + if pandas is not None: # pragma: NO COVER |
| 42 | + |
| 43 | + def _to_wkb(): |
| 44 | + # Create a closure that: |
| 45 | + # - Adds a not-null check. This allows the returned function to |
| 46 | + # be used directly with apply, unlike `shapely.wkb.dumps`. |
| 47 | + # - Avoid extra work done by `shapely.wkb.dumps` that we don't need. |
| 48 | + # - Caches the WKBWriter (and write method lookup :) ) |
| 49 | + # - Avoids adding WKBWriter, lgeos, and notnull to the module namespace. |
| 50 | + from shapely.geos import WKBWriter, lgeos |
| 51 | + |
| 52 | + write = WKBWriter(lgeos).write |
| 53 | + notnull = pandas.notnull |
| 54 | + |
| 55 | + def _to_wkb(v): |
| 56 | + return write(v) if notnull(v) else v |
| 57 | + |
| 58 | + return _to_wkb |
| 59 | + |
| 60 | + _to_wkb = _to_wkb() |
| 61 | + |
32 | 62 | try:
|
33 | 63 | from google.cloud.bigquery_storage import ArrowSerializationOptions
|
34 | 64 | except ImportError:
|
|
71 | 101 | "uint8": "INTEGER",
|
72 | 102 | "uint16": "INTEGER",
|
73 | 103 | "uint32": "INTEGER",
|
| 104 | + "geometry": "GEOGRAPHY", |
74 | 105 | }
|
75 | 106 |
|
76 | 107 |
|
@@ -191,14 +222,16 @@ def bq_to_arrow_data_type(field):
|
191 | 222 | return data_type_constructor()
|
192 | 223 |
|
193 | 224 |
|
194 |
| -def bq_to_arrow_field(bq_field): |
| 225 | +def bq_to_arrow_field(bq_field, array_type=None): |
195 | 226 | """Return the Arrow field, corresponding to a given BigQuery column.
|
196 | 227 |
|
197 | 228 | Returns:
|
198 | 229 | None: if the Arrow type cannot be determined.
|
199 | 230 | """
|
200 | 231 | arrow_type = bq_to_arrow_data_type(bq_field)
|
201 |
| - if arrow_type: |
| 232 | + if arrow_type is not None: |
| 233 | + if array_type is not None: |
| 234 | + arrow_type = array_type # For GEOGRAPHY, at least initially |
202 | 235 | is_nullable = bq_field.mode.upper() == "NULLABLE"
|
203 | 236 | return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable)
|
204 | 237 |
|
@@ -245,7 +278,24 @@ def bq_schema_to_nullsafe_pandas_dtypes(
|
245 | 278 |
|
246 | 279 |
|
247 | 280 | def bq_to_arrow_array(series, bq_field):
|
248 |
| - arrow_type = bq_to_arrow_data_type(bq_field) |
| 281 | + if bq_field.field_type.upper() == "GEOGRAPHY": |
| 282 | + arrow_type = None |
| 283 | + first = _first_valid(series) |
| 284 | + if first is not None: |
| 285 | + if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): |
| 286 | + arrow_type = pyarrow.binary() |
| 287 | + # Convert shapey geometry to WKB binary format: |
| 288 | + series = series.apply(_to_wkb) |
| 289 | + elif isinstance(first, bytes): |
| 290 | + arrow_type = pyarrow.binary() |
| 291 | + elif series.dtype.name == "geometry": |
| 292 | + # We have a GeoSeries containing all nulls, convert it to a pandas series |
| 293 | + series = pandas.Series(numpy.array(series)) |
| 294 | + |
| 295 | + if arrow_type is None: |
| 296 | + arrow_type = bq_to_arrow_data_type(bq_field) |
| 297 | + else: |
| 298 | + arrow_type = bq_to_arrow_data_type(bq_field) |
249 | 299 |
|
250 | 300 | field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""
|
251 | 301 |
|
@@ -299,6 +349,12 @@ def list_columns_and_indexes(dataframe):
|
299 | 349 | return columns_and_indexes
|
300 | 350 |
|
301 | 351 |
|
| 352 | +def _first_valid(series): |
| 353 | + first_valid_index = series.first_valid_index() |
| 354 | + if first_valid_index is not None: |
| 355 | + return series.at[first_valid_index] |
| 356 | + |
| 357 | + |
302 | 358 | def dataframe_to_bq_schema(dataframe, bq_schema):
|
303 | 359 | """Convert a pandas DataFrame schema to a BigQuery schema.
|
304 | 360 |
|
@@ -339,6 +395,13 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
|
339 | 395 | # Otherwise, try to automatically determine the type based on the
|
340 | 396 | # pandas dtype.
|
341 | 397 | bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
|
| 398 | + if bq_type is None: |
| 399 | + sample_data = _first_valid(dataframe[column]) |
| 400 | + if ( |
| 401 | + isinstance(sample_data, _BaseGeometry) |
| 402 | + and sample_data is not None # Paranoia |
| 403 | + ): |
| 404 | + bq_type = "GEOGRAPHY" |
342 | 405 | bq_field = schema.SchemaField(column, bq_type)
|
343 | 406 | bq_schema_out.append(bq_field)
|
344 | 407 |
|
@@ -463,11 +526,11 @@ def dataframe_to_arrow(dataframe, bq_schema):
|
463 | 526 | arrow_names = []
|
464 | 527 | arrow_fields = []
|
465 | 528 | for bq_field in bq_schema:
|
466 |
| - arrow_fields.append(bq_to_arrow_field(bq_field)) |
467 | 529 | arrow_names.append(bq_field.name)
|
468 | 530 | arrow_arrays.append(
|
469 | 531 | bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
|
470 | 532 | )
|
| 533 | + arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) |
471 | 534 |
|
472 | 535 | if all((field is not None for field in arrow_fields)):
|
473 | 536 | return pyarrow.Table.from_arrays(
|
@@ -791,7 +854,13 @@ def dataframe_to_json_generator(dataframe):
|
791 | 854 | output = {}
|
792 | 855 | for column, value in zip(dataframe.columns, row):
|
793 | 856 | # Omit NaN values.
|
794 |
| - if pandas.isna(value): |
| 857 | + is_nan = pandas.isna(value) |
| 858 | + |
| 859 | + # isna() can also return an array-like of bools, but the latter's boolean |
| 860 | + # value is ambiguous, hence an extra check. An array-like value is *not* |
| 861 | + # considered a NaN, however. |
| 862 | + if isinstance(is_nan, bool) and is_nan: |
795 | 863 | continue
|
796 | 864 | output[column] = value
|
| 865 | + |
797 | 866 | yield output
|
0 commit comments