Skip to content

feat: add ExternalCatalogTableOptions class and tests #2116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions google/cloud/bigquery/external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.format_options import AvroOptions, ParquetOptions
from google.cloud.bigquery import schema
from google.cloud.bigquery.schema import SchemaField


Expand Down Expand Up @@ -1077,3 +1078,109 @@ def from_api_repr(cls, api_repr: dict) -> ExternalCatalogDatasetOptions:
config = cls()
config._properties = api_repr
return config


class ExternalCatalogTableOptions:
"""Metadata about open source compatible table. The fields contained in these
options correspond to hive metastore's table level properties.

Args:
connection_id (Optional[str]): The connection specifying the credentials to be
used to read external storage, such as Azure Blob, Cloud Storage, or
S3. The connection is needed to read the open source table from
BigQuery Engine. The connection_id can have the form `..` or
`projects//locations//connections/`.
parameters (Union[Dict[str, Any], None]): A map of key value pairs defining the parameters
and properties of the open source table. Corresponds with hive meta
store table parameters. Maximum size of 4Mib.
storage_descriptor (Optional[StorageDescriptor]): A storage descriptor containing information
about the physical storage of this table.
"""

def __init__(
self,
connection_id: Optional[str] = None,
parameters: Union[Dict[str, Any], None] = None,
storage_descriptor: Optional[schema.StorageDescriptor] = None,
):
self._properties: Dict[str, Any] = {}
self.connection_id = connection_id
self.parameters = parameters
self.storage_descriptor = storage_descriptor

@property
def connection_id(self) -> Optional[str]:
"""Optional. The connection specifying the credentials to be
used to read external storage, such as Azure Blob, Cloud Storage, or
S3. The connection is needed to read the open source table from
BigQuery Engine. The connection_id can have the form `..` or
`projects//locations//connections/`.
"""

return self._properties.get("connectionId")

@connection_id.setter
def connection_id(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["connectionId"] = value

@property
def parameters(self) -> Union[Dict[str, Any], None]:
"""Optional. A map of key value pairs defining the parameters and
properties of the open source table. Corresponds with hive meta
store table parameters. Maximum size of 4Mib.
"""

return self._properties.get("parameters")

@parameters.setter
def parameters(self, value: Union[Dict[str, Any], None]):
value = _helpers._isinstance_or_raise(value, dict, none_allowed=True)
self._properties["parameters"] = value

@property
def storage_descriptor(self) -> Any:
"""Optional. A storage descriptor containing information about the
physical storage of this table."""

prop = _helpers._get_sub_prop(self._properties, ["storageDescriptor"])

if prop is not None:
return schema.StorageDescriptor.from_api_repr(prop)
return None

@storage_descriptor.setter
def storage_descriptor(self, value: Union[schema.StorageDescriptor, dict, None]):
value = _helpers._isinstance_or_raise(
value, (schema.StorageDescriptor, dict), none_allowed=True
)
if isinstance(value, schema.StorageDescriptor):
self._properties["storageDescriptor"] = value.to_api_repr()
else:
self._properties["storageDescriptor"] = value

def to_api_repr(self) -> dict:
"""Build an API representation of this object.

Returns:
Dict[str, Any]:
A dictionary in the format used by the BigQuery API.
"""

return self._properties

@classmethod
def from_api_repr(cls, api_repr: dict) -> ExternalCatalogTableOptions:
"""Factory: constructs an instance of the class (cls)
given its API representation.

Args:
api_repr (Dict[str, Any]):
API representation of the object to be instantiated.

Returns:
An instance of the class initialized with data from 'api_repr'.
"""
config = cls()
config._properties = api_repr
return config
2 changes: 1 addition & 1 deletion google/cloud/bigquery/magics/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
bigquery_magics = None


IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__)
IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__) # type: ignore


class Context(object):
Expand Down
35 changes: 35 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery import external_config

if typing.TYPE_CHECKING: # pragma: NO COVER
# Unconditionally import optional dependencies again to tell pytype that
Expand Down Expand Up @@ -408,6 +409,7 @@ class Table(_TableBase):
"require_partition_filter": "requirePartitionFilter",
"table_constraints": "tableConstraints",
"max_staleness": "maxStaleness",
"external_catalog_table_options": "externalCatalogTableOptions",
}

def __init__(self, table_ref, schema=None) -> None:
Expand Down Expand Up @@ -1023,6 +1025,39 @@ def table_constraints(self) -> Optional["TableConstraints"]:
table_constraints = TableConstraints.from_api_repr(table_constraints)
return table_constraints

@property
def external_catalog_table_options(
self,
) -> Optional[external_config.ExternalCatalogTableOptions]:
"""Options defining open source compatible datasets living in the
BigQuery catalog. Contains metadata of open source database, schema
or namespace represented by the current dataset."""

prop = self._properties.get(
self._PROPERTY_TO_API_FIELD["external_catalog_table_options"]
)
if prop is not None:
return external_config.ExternalCatalogTableOptions.from_api_repr(prop)
return None

@external_catalog_table_options.setter
def external_catalog_table_options(
self, value: Union[external_config.ExternalCatalogTableOptions, dict, None]
):
value = _helpers._isinstance_or_raise(
value,
(external_config.ExternalCatalogTableOptions, dict),
none_allowed=True,
)
if isinstance(value, external_config.ExternalCatalogTableOptions):
self._properties[
self._PROPERTY_TO_API_FIELD["external_catalog_table_options"]
] = value.to_api_repr()
else:
self._properties[
self._PROPERTY_TO_API_FIELD["external_catalog_table_options"]
] = value

@classmethod
def from_string(cls, full_table_id: str) -> "Table":
"""Construct a table from fully-qualified table ID.
Expand Down
137 changes: 137 additions & 0 deletions tests/unit/test_external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import base64
import copy
from typing import Any, Dict, Optional
import unittest

from google.cloud.bigquery import external_config
Expand Down Expand Up @@ -979,3 +980,139 @@ def test_from_api_repr(self):

assert isinstance(result, external_config.ExternalCatalogDatasetOptions)
assert result._properties == api_repr


class TestExternalCatalogTableOptions:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.external_config import ExternalCatalogTableOptions

return ExternalCatalogTableOptions

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

storage_descriptor_repr = {
"inputFormat": "testpath.to.OrcInputFormat",
"locationUri": "gs://test/path/",
"outputFormat": "testpath.to.OrcOutputFormat",
"serDeInfo": {
"serializationLibrary": "testpath.to.LazySimpleSerDe",
"name": "serde_lib_name",
"parameters": {"key": "value"},
},
}

CONNECTIONID = "connection123"
PARAMETERS = {"key": "value"}
STORAGEDESCRIPTOR = schema.StorageDescriptor.from_api_repr(storage_descriptor_repr)
EXTERNALCATALOGTABLEOPTIONS = {
"connectionId": "connection123",
"parameters": {"key": "value"},
"storageDescriptor": STORAGEDESCRIPTOR.to_api_repr(),
}

@pytest.mark.parametrize(
"connection_id,parameters,storage_descriptor",
[
(
CONNECTIONID,
PARAMETERS,
STORAGEDESCRIPTOR,
), # set all parameters at once
(CONNECTIONID, None, None), # set only one parameter at a time
(None, PARAMETERS, None),
(None, None, STORAGEDESCRIPTOR), # set storage descriptor using obj
(None, None, storage_descriptor_repr), # set storage descriptor using dict
(None, None, None), # use default parameters
],
)
def test_ctor_initialization(
self,
connection_id,
parameters,
storage_descriptor,
):
instance = self._make_one(
connection_id=connection_id,
parameters=parameters,
storage_descriptor=storage_descriptor,
)

assert instance.connection_id == connection_id
assert instance.parameters == parameters

if isinstance(storage_descriptor, schema.StorageDescriptor):
assert (
instance.storage_descriptor.to_api_repr()
== storage_descriptor.to_api_repr()
)
elif isinstance(storage_descriptor, dict):
assert instance.storage_descriptor.to_api_repr() == storage_descriptor
else:
assert instance.storage_descriptor is None

@pytest.mark.parametrize(
"connection_id,parameters,storage_descriptor",
[
pytest.param(
123,
PARAMETERS,
STORAGEDESCRIPTOR,
id="connection_id-invalid-type",
),
pytest.param(
CONNECTIONID,
123,
STORAGEDESCRIPTOR,
id="parameters-invalid-type",
),
pytest.param(
CONNECTIONID,
PARAMETERS,
123,
id="storage_descriptor-invalid-type",
),
],
)
def test_ctor_invalid_input(
self,
connection_id: str,
parameters: Dict[str, Any],
storage_descriptor: Optional[schema.StorageDescriptor],
):
with pytest.raises(TypeError) as e:
external_config.ExternalCatalogTableOptions(
connection_id=connection_id,
parameters=parameters,
storage_descriptor=storage_descriptor,
)

# Looking for the first word from the string "Pass <variable> as..."
assert "Pass " in str(e.value)

def test_to_api_repr(self):
instance = self._make_one(
connection_id=self.CONNECTIONID,
parameters=self.PARAMETERS,
storage_descriptor=self.STORAGEDESCRIPTOR,
)

result = instance.to_api_repr()
expected = self.EXTERNALCATALOGTABLEOPTIONS

assert result == expected

def test_from_api_repr(self):
result = self._make_one(
connection_id=self.CONNECTIONID,
parameters=self.PARAMETERS,
storage_descriptor=self.STORAGEDESCRIPTOR,
)

instance = self._make_one()
api_repr = self.EXTERNALCATALOGTABLEOPTIONS
result = instance.from_api_repr(api_repr)

assert isinstance(result, external_config.ExternalCatalogTableOptions)
assert result._properties == api_repr
Loading
Loading