Skip to content

Snow 2013774 server side structured types bindings #2343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/snowflake/connector/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
OCSPMode,
QueryStatus,
)
from .converter import SnowflakeConverter
from .converter import SnowflakeConverter, infer_snowflake_type
from .cursor import LOG_MAX_QUERY_LENGTH, SnowflakeCursor
from .description import (
CLIENT_NAME,
Expand Down Expand Up @@ -119,6 +119,7 @@
from .sqlstate import SQLSTATE_CONNECTION_NOT_EXISTS, SQLSTATE_FEATURE_NOT_SUPPORTED
from .telemetry import TelemetryClient, TelemetryData, TelemetryField
from .time_util import HeartBeatTimer, get_time_millis
from .type_wrappers import snowflake_type_wrapper
from .url_util import extract_top_level_domain_from_hostname
from .util_text import construct_hostname, parse_account, split_statements
from .wif_util import AttestationProvider
Expand Down Expand Up @@ -1737,7 +1738,7 @@ def _get_snowflake_type_and_binding(
)
snowflake_type, v = v
else:
snowflake_type = self.converter.snowflake_type(v)
snowflake_type = infer_snowflake_type(v)
if snowflake_type is None:
Error.errorhandler_wrapper(
self,
Expand All @@ -1756,6 +1757,9 @@ def _get_snowflake_type_and_binding(
self.converter.to_snowflake_bindings(snowflake_type, v),
)

def _is_complex_type(self, snowflake_type: str):
return snowflake_type in ("VARIANT", "OBJECT", "ARRAY", "MAP")

# TODO we could probably rework this to not make dicts like this: {'1': 'value', '2': '13'}
def _process_params_qmarks(
self,
Expand All @@ -1769,8 +1773,12 @@ def _process_params_qmarks(
get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor)

for idx, v in enumerate(params):
if isinstance(v, list):
snowflake_type = self.converter.snowflake_type(v)
if isinstance(v, snowflake_type_wrapper):
processed_params[str(idx + 1)] = (
self.converter.to_snowflake_bindings_dict("", v)
)
elif isinstance(v, list):
snowflake_type = infer_snowflake_type(v)
all_param_data = list(map(get_type_and_binding, v))
first_type = all_param_data[0].type
# if all elements have the same snowflake type, update snowflake_type
Expand Down
229 changes: 223 additions & 6 deletions src/snowflake/connector/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import date, datetime
from datetime import time as dt_t
from datetime import timedelta, timezone, tzinfo
from decimal import Decimal
from functools import partial
from logging import getLogger
from math import ceil
Expand All @@ -22,6 +23,12 @@
from .errors import ProgrammingError
from .sfbinaryformat import binary_to_python, binary_to_snowflake
from .sfdatetime import sfdatetime_total_seconds_from_timedelta
from .type_wrappers import (
snowflake_array,
snowflake_map,
snowflake_object,
snowflake_variant,
)

if TYPE_CHECKING:
from numpy import bool_, int64
Expand Down Expand Up @@ -81,6 +88,8 @@
# Type alias
SnowflakeConverterType = Callable[[Any], Any]

JSON_FORMAT_STR = "json"


def convert_datetime_to_epoch(dt: datetime) -> float:
"""Converts datetime to epoch time in seconds.
Expand Down Expand Up @@ -147,6 +156,12 @@ def _generate_tzinfo_from_tzoffset(tzoffset_minutes: int) -> tzinfo:
return pytz.FixedOffset(tzoffset_minutes)


def infer_snowflake_type(value: Any) -> str | None:
"""Returns Snowflake data type for the value. This is used for qmark parameter style."""
type_name = value.__class__.__name__.lower()
return PYTHON_TO_SNOWFLAKE_TYPE.get(type_name)


class SnowflakeConverter:
def __init__(self, **kwargs) -> None:
self._parameters: dict[str, str | int | bool] = {}
Expand Down Expand Up @@ -355,11 +370,6 @@ def _BOOLEAN_to_python(
) -> Callable:
return lambda value: value in ("1", "TRUE")

def snowflake_type(self, value: Any) -> str | None:
"""Returns Snowflake data type for the value. This is used for qmark parameter style."""
type_name = value.__class__.__name__.lower()
return PYTHON_TO_SNOWFLAKE_TYPE.get(type_name)

def to_snowflake_bindings(self, snowflake_type: str, value: Any) -> str:
"""Converts Python data to snowflake data for qmark and numeric parameter style.

Expand All @@ -370,10 +380,32 @@ def to_snowflake_bindings(self, snowflake_type: str, value: Any) -> str:
snowflake_type, value
)

def to_snowflake_bindings_dict(
self, snowflake_type: str, value: Any
) -> dict[str, Any]:
"""Converts Python data to snowflake bindings dict dor qmark and numeric parameter style.

The output is bound in a query in the server side.
"""
type_name = value.__class__.__name__.lower()
return getattr(self, f"_{type_name}_to_snowflake_bindings_dict")(
snowflake_type, value
)

def _str_to_snowflake_bindings(self, _, value: str) -> str:
# NOTE: str type is always taken as a text data and never binary
return str(value)

def _str_to_snowflake_bindings_dict(
self, snowflake_type: str, value: str
) -> dict[str, Any]:
return {
"type": snowflake_type,
"value": str(value),
"schema": None,
"fmt": JSON_FORMAT_STR,
}

def _date_to_snowflake_bindings_in_bulk_insertion(self, value: date) -> str:
# notes: this is for date type bulk insertion, it's different from non-bulk date type insertion flow
milliseconds = _convert_date_to_epoch_milliseconds(value)
Expand Down Expand Up @@ -403,6 +435,14 @@ def _bool_to_snowflake_bindings(self, _, value: bool) -> str:
def _nonetype_to_snowflake_bindings(self, *_) -> None:
return None

def _nonetype_to_snowflake_bindings_dict(self, *_) -> dict[str, Any]:
return {
"type": "ANY",
"value": None,
"schema": None,
"format": JSON_FORMAT_STR,
}

def _date_to_snowflake_bindings(self, _, value: date) -> str:
# this is for date type non-bulk insertion, it's different from bulk date type insertion flow
# milliseconds
Expand Down Expand Up @@ -474,6 +514,183 @@ def _timedelta_to_snowflake_bindings(
str(hours * 3600 + mins * 60 + secs) + f"{value.microseconds:06d}" + "000"
)

def _python_object_to_structured_type_field(self, value: Any) -> Any:
"""Converts a Python object to a structured type field."""
if isinstance(value, datetime):
return value.strftime("%a, %d %b %Y %H:%M:%S %Z")
elif isinstance(value, date):
return value.strftime("%Y-%m-%d")
elif isinstance(value, time.struct_time) or isinstance(value, dt_t):
return time.strftime("%a, %d %b %Y %H:%M:%S %Z", value)
elif isinstance(value, timedelta):
return self._timedelta_to_snowflake_bindings("TIME", value)
elif isinstance(value, bytes) or isinstance(value, bytearray):
return self._bytes_to_snowflake_bindings(None, value)
elif isinstance(value, numpy.int64):
return int(value)
elif isinstance(value, Decimal):
return float(value)
elif isinstance(value, snowflake_array):
return self._snowflake_array_to_snowflake_bindings(value)
elif isinstance(value, snowflake_object):
return self._snowflake_object_to_snowflake_bindings(value)
else:
return value

def _snowflake_array_to_snowflake_bindings(
self, value: snowflake_array
) -> list[Any]:
if not value:
return []

converted_values = []
# TODO: is this an edge case that needs to be handled

if value.original_type == bytearray:
# bytearray when converted to snowflake_array becomes an array of int. The reasonable expectation would be
# for it to be binded as an array of individual bytes the same way as array of bytes value is binded.
return [self._bytes_to_snowflake_bindings(None, bytes([v])) for v in value]

for v in value:
if isinstance(v, snowflake_object):
converted_values.append(self._snowflake_object_to_snowflake_bindings(v))
elif isinstance(v, snowflake_array):
converted_values.append(self._snowflake_array_to_snowflake_bindings(v))
else:
converted_values.append(self._python_object_to_structured_type_field(v))

return converted_values

def _snowflake_array_to_snowflake_bindings_dict(
self, _, value: snowflake_array
) -> dict[str, Any]:
if not value:
return {
"type": "ARRAY",
"value": "[]",
"fmt": JSON_FORMAT_STR,
"schema": None,
}

return {
"type": "ARRAY",
"value": json.dumps(self._snowflake_array_to_snowflake_bindings(value)),
"fmt": JSON_FORMAT_STR,
}

def _snowflake_object_to_snowflake_bindings(
self, value: snowflake_object
) -> dict[str, Any]:
if not value:
return {}

converted_object = {}

for key, v in value.items():
if type(key) is not str:
logger.info(
"snowflake_object key %s is not a string. Converting to string.",
key,
)
key = str(key)

converted_object[key] = self._python_object_to_structured_type_field(v)

return converted_object

def _snowflake_object_to_snowflake_bindings_dict(
self, _, value: snowflake_object
) -> dict[str, Any]:
if not value:
return {
"type": "OBJECT",
"value": "{}",
"fmt": JSON_FORMAT_STR,
"schema": None,
}

return {
"type": "OBJECT",
"value": json.dumps(self._snowflake_object_to_snowflake_bindings(value)),
"fmt": JSON_FORMAT_STR,
"schema": None,
}

def _snowflake_variant_to_snowflake_bindings_dict(
self, _, value: snowflake_variant
) -> dict[str, Any]:
return {
"type": "VARIANT",
"value": json.dumps(value.value),
"fmt": JSON_FORMAT_STR,
"schema": None,
}

def _snowflake_map_to_snowflake_bindings(
self, value: snowflake_map
) -> dict[Any, Any]:
"""Converts snowflake_map to a dictionary for binding."""
if not value:
return {}

converted_map = {}
key_type = None
value_type = None
for key, v in value.items():
new_key_type = infer_snowflake_type(key)
new_value_type = infer_snowflake_type(v)

if key_type and new_key_type != key_type:
raise ValueError("Keys in snowflake_map must be of the same type.")
else:
key_type = new_key_type

if value_type and new_value_type != value_type:
raise ValueError("Values in snowflake_map must be of the same type.")
else:
value_type = new_value_type

if key in converted_map:
logger.warning(
"Duplicate key found in snowflake_map: %s. Overwriting the value.",
key,
)
converted_map[key] = self._python_object_to_structured_type_field(v)

return converted_map

def _snowflake_map_to_snowflake_bindings_dict(
self, _, value: snowflake_map
) -> dict[str, Any]:
if not value:
return {
"type": "OBJECT",
"value": "{}",
"fmt": JSON_FORMAT_STR,
"schema": None,
}

return {
"type": "OBJECT",
"value": json.dumps(self._snowflake_map_to_snowflake_bindings(value)),
"nullable": True,
"fmt": JSON_FORMAT_STR,
"schema": {
"type": "MAP",
"nullable": True,
"fields": [
{
"type": value.key_type,
**value.key_attributes,
},
{
"type": value.value_type,
**value.value_attributes,
},
],
},
}

def to_snowflake(self, value: Any) -> Any:
"""Converts Python data to Snowflake data for pyformat/format style.

Expand Down Expand Up @@ -690,7 +907,7 @@ def to_csv_bindings(self, value: tuple[str, Any] | Any) -> str | None:
# to_csv_bindings is only used in bulk insertion logic
val = self._date_to_snowflake_bindings_in_bulk_insertion(value)
else:
_type = self.snowflake_type(value)
_type = infer_snowflake_type(value)
val = self.to_snowflake_bindings(_type, value)
return self.escape_for_csv(val)

Expand Down
Loading
Loading