Skip to content

Commit 9ed4496

Browse files
committed
[WIP] support koalas and modin
add tests for koalas fix type issues with koalas patch to pd.Series, DataFrame add datatype koalas tests finish writing initial test suite for koalas fix regressions configure koalas fix regressions update pylint dep update deps update black fix lint use context manager for koalas ops_on_diff_frames updates update pre-commit mypy typing ignore fix docs install hypothesis for koalas ci don't cover modin import check better handling of timestamp fix koalas wip wip wip coverage hypothesis health check
1 parent c786f67 commit 9ed4496

24 files changed

+986
-118
lines changed

.github/workflows/ci-tests.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ jobs:
145145
--non-interactive
146146
--session "tests-${{ matrix.python-version }}(extra='strategies', pandas='${{ matrix.pandas-version }}')"
147147
148+
- name: Unit Tests - Koalas
149+
run: >
150+
nox
151+
-db virtualenv -r -v
152+
--non-interactive
153+
--session "tests-${{ matrix.python-version }}(extra='koalas', pandas='${{ matrix.pandas-version }}')"
154+
148155
- name: Upload coverage to Codecov
149156
uses: "codecov/codecov-action@v1"
150157

docs/source/conf.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,18 @@
6464
else:
6565
SKIP_STRATEGY = False
6666
67+
try:
68+
import koalas
69+
except ImportError:
70+
KOALAS_INSTALLED = True
71+
else:
72+
KOALAS_INSTALLED = False
73+
6774
SKIP = sys.version_info < (3, 6)
6875
PY36 = sys.version_info < (3, 7)
6976
SKIP_PANDAS_LT_V1 = version.parse(pd.__version__).release < (1, 0) or PY36
7077
SKIP_SCALING = True
78+
SKIP_SCHEMA_MODEL = SKIP_PANDAS_LT_V1 or KOALAS_INSTALLED
7179
"""
7280

7381
doctest_default_flags = (

docs/source/schema_models.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ You must give a **type**, not an **instance**.
194194
:red:`` Bad:
195195

196196
.. testcode:: dataframe_schema_model
197-
:skipif: SKIP_PANDAS_LT_V1
197+
:skipif: SKIP_SCHEMA_MODEL
198198

199199
class Schema(pa.SchemaModel):
200200
a: Series[pd.StringDtype()]
201201

202202
.. testoutput:: dataframe_schema_model
203-
:skipif: SKIP_PANDAS_LT_V1
203+
:skipif: SKIP_SCHEMA_MODEL
204204

205205
Traceback (most recent call last):
206206
...

environment.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@ dependencies:
1919
- frictionless
2020
- pyarrow
2121

22+
# koalas extra
23+
- koalas
24+
- pyspark
25+
2226
# testing and dependencies
2327
- black >= 20.8b1
2428

2529
# testing
2630
- isort >= 5.7.0
2731
- codecov
2832
- mypy >= 0.902 # mypy no longer bundle stubs for third-party libraries
29-
- pylint = v2.11.1
33+
- pylint = 2.11.1
3034
- pytest
3135
- pytest-cov
3236
- pytest-xdist

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def install_extras(
184184
specs.append(
185185
spec if spec != "pandas" else f"pandas{pandas_version}"
186186
)
187-
if extra == "core":
187+
if extra in {"core", "koalas"}:
188188
specs.append(REQUIRES["all"]["hypothesis"])
189189

190190
# this is a temporary measure to install setuptools due to this issue:

pandera/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""A flexible and expressive pandas validation library."""
22
import platform
33

4+
from pandera import external_config
45
from pandera.dtypes import (
56
Bool,
67
Category,
@@ -56,4 +57,5 @@
5657
from .version import __version__
5758

5859
if platform.system() != "Windows":
60+
# pylint: disable=ungrouped-imports
5961
from pandera.dtypes import Complex256, Float128

pandera/check_utils.py

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,88 @@
11
"""Utility functions for validation."""
22

3-
from typing import Optional, Tuple, Union
3+
from functools import lru_cache
4+
from typing import NamedTuple, Optional, Tuple, Union
45

56
import pandas as pd
67

8+
SupportedTypes = NamedTuple(
9+
"SupportedTypes",
10+
(
11+
("table_types", Tuple[type]),
12+
("field_types", Tuple[type]),
13+
("index_types", Tuple[type]),
14+
("multiindex_types", Tuple[type]),
15+
),
16+
)
17+
18+
19+
@lru_cache(maxsize=None)
20+
def _supported_types():
21+
# pylint: disable=import-outside-toplevel
22+
table_types = [pd.DataFrame]
23+
field_types = [pd.Series]
24+
index_types = [pd.Index]
25+
multiindex_types = [pd.MultiIndex]
26+
27+
try:
28+
import databricks.koalas as ks
29+
30+
table_types.append(ks.DataFrame)
31+
field_types.append(ks.Series)
32+
index_types.append(ks.Index)
33+
multiindex_types.append(ks.MultiIndex)
34+
except ImportError:
35+
pass
36+
try: # pragma: no cover
37+
import modin.pandas as mpd
38+
39+
table_types.append(mpd.DataFrame)
40+
field_types.append(mpd.Series)
41+
index_types.append(mpd.Index)
42+
multiindex_types.append(mpd.MultiIndex)
43+
except ImportError:
44+
pass
45+
46+
return SupportedTypes(
47+
tuple(table_types),
48+
tuple(field_types),
49+
tuple(index_types),
50+
tuple(multiindex_types),
51+
)
52+
53+
54+
def is_table(obj):
55+
"""Verifies whether an object is table-like.
56+
57+
Where a table is a 2-dimensional data matrix of rows and columns, which
58+
can be indexed in multiple different ways.
59+
"""
60+
return isinstance(obj, _supported_types().table_types)
61+
62+
63+
def is_field(obj):
64+
"""Verifies whether an object is field-like.
65+
66+
Where a field is a columnar representation of data in a table-like
67+
data structure.
68+
"""
69+
return isinstance(obj, _supported_types().field_types)
70+
71+
72+
def is_index(obj):
73+
"""Verifies whether an object is a table index."""
74+
return isinstance(obj, _supported_types().index_types)
75+
76+
77+
def is_multiindex(obj):
78+
"""Verifies whether an object is a multi-level table index."""
79+
return isinstance(obj, _supported_types().multiindex_types)
80+
81+
82+
def is_supported_check_obj(obj):
83+
"""Verifies whether an object is table- or field-like."""
84+
return is_table(obj) or is_field(obj)
85+
786

887
def prepare_series_check_output(
988
check_obj: Union[pd.Series, pd.DataFrame],
@@ -25,9 +104,20 @@ def prepare_series_check_output(
25104
check_output = check_output | isna
26105
failure_cases = check_obj[~check_output]
27106
if not failure_cases.empty and n_failure_cases is not None:
28-
failure_cases = failure_cases.groupby(check_output).head(
29-
n_failure_cases
30-
)
107+
# NOTE: this is a hack to support koalas, since you can't use groupby
108+
# on a dataframe with another dataframe
109+
if type(failure_cases).__module__.startswith("databricks.koalas"):
110+
failure_cases = (
111+
failure_cases.rename("failure_cases")
112+
.to_frame()
113+
.assign(check_output=check_output)
114+
.groupby("check_output")
115+
.head(n_failure_cases)["failure_cases"]
116+
)
117+
else:
118+
failure_cases = failure_cases.groupby(check_output).head(
119+
n_failure_cases
120+
)
31121
return check_output, failure_cases
32122

33123

pandera/checks.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -369,27 +369,27 @@ def __call__(
369369
``failure_cases``: subset of the check_object that failed.
370370
"""
371371
# prepare check object
372-
if isinstance(df_or_series, pd.Series) or (
373-
column is not None and isinstance(df_or_series, pd.DataFrame)
372+
if check_utils.is_field(df_or_series) or (
373+
column is not None and check_utils.is_table(df_or_series)
374374
):
375375
check_obj = self._prepare_series_input(df_or_series, column)
376-
elif isinstance(df_or_series, pd.DataFrame):
376+
elif check_utils.is_table(df_or_series):
377377
check_obj = self._prepare_dataframe_input(df_or_series)
378378
else:
379379
raise ValueError(
380-
f"object of type {df_or_series} not supported. Must be a "
381-
"Series, a dictionary of Series, or DataFrame"
380+
f"object of type {type(df_or_series)} not supported. Must be "
381+
"a Series, a dictionary of Series, or DataFrame"
382382
)
383383

384384
# apply check function to check object
385385
check_fn = partial(self._check_fn, **self._check_kwargs)
386386

387387
if self.element_wise:
388388
check_output = (
389-
check_obj.apply(check_fn, axis=1)
390-
if isinstance(check_obj, pd.DataFrame)
391-
else check_obj.map(check_fn)
392-
if isinstance(check_obj, pd.Series)
389+
check_obj.apply(check_fn, axis=1) # type: ignore
390+
if check_utils.is_table(check_obj)
391+
else check_obj.map(check_fn) # type: ignore
392+
if check_utils.is_field(check_obj)
393393
else check_fn(check_obj)
394394
)
395395
else:
@@ -401,12 +401,12 @@ def __call__(
401401
if (
402402
isinstance(check_obj, dict)
403403
or isinstance(check_output, bool)
404-
or not isinstance(check_output, (pd.Series, pd.DataFrame))
404+
or not check_utils.is_supported_check_obj(check_output)
405405
or check_obj.shape[0] != check_output.shape[0]
406406
or (check_obj.index != check_output.index).all()
407407
):
408408
failure_cases = None
409-
elif isinstance(check_output, pd.Series):
409+
elif check_utils.is_field(check_output):
410410
(
411411
check_output,
412412
failure_cases,
@@ -416,7 +416,7 @@ def __call__(
416416
ignore_na=self.ignore_na,
417417
n_failure_cases=self.n_failure_cases,
418418
)
419-
elif isinstance(check_output, pd.DataFrame):
419+
elif check_utils.is_table(check_output):
420420
(
421421
check_output,
422422
failure_cases,
@@ -434,12 +434,11 @@ def __call__(
434434

435435
check_passed = (
436436
check_output.all()
437-
if isinstance(check_output, pd.Series)
437+
if check_utils.is_field(check_output)
438438
else check_output.all(axis=None)
439-
if isinstance(check_output, pd.DataFrame)
439+
if check_utils.is_table(check_output)
440440
else check_output
441441
)
442-
443442
return CheckResult(
444443
check_output, check_passed, check_obj, failure_cases
445444
)

pandera/engines/pandas_engine.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,12 +444,28 @@ class NpString(numpy_engine.String):
444444
"""Specializes numpy_engine.String.coerce to handle pd.NA values."""
445445

446446
def coerce(self, data_container: PandasObject) -> np.ndarray:
447-
# Convert to object first to avoid
448-
# TypeError: object cannot be converted to an IntegerDtype
449-
data_container = data_container.astype(object)
450-
return data_container.where(
451-
data_container.isna(), data_container.astype(str)
452-
)
447+
def _to_str(obj):
448+
# NOTE: this is a hack to handle the following case:
449+
# koalas.Index doesn't support .where method yet, use numpy
450+
reverter = None
451+
if type(obj).__module__.startswith("databricks.koalas"):
452+
# pylint: disable=import-outside-toplevel
453+
import databricks.koalas as ks
454+
455+
if isinstance(obj, ks.Index):
456+
obj = obj.to_series()
457+
reverter = ks.Index
458+
else:
459+
obj = obj.astype(object)
460+
461+
obj = (
462+
obj.astype(str)
463+
if obj.notna().all(axis=None)
464+
else obj.where(obj.isna(), obj.astype(str))
465+
)
466+
return obj if reverter is None else reverter(obj)
467+
468+
return _to_str(data_container)
453469

454470
def check(self, pandera_dtype: dtypes.DataType) -> bool:
455471
return isinstance(pandera_dtype, (numpy_engine.Object, type(self)))
@@ -471,6 +487,7 @@ def check(self, pandera_dtype: dtypes.DataType) -> bool:
471487
object,
472488
np.object_,
473489
np.bytes_,
490+
np.string_,
474491
],
475492
)
476493

@@ -517,7 +534,18 @@ def __post_init__(self):
517534

518535
def coerce(self, data_container: PandasObject) -> PandasObject:
519536
def _to_datetime(col: pd.Series) -> pd.Series:
520-
col = pd.to_datetime(col, **self.to_datetime_kwargs)
537+
# NOTE: this is a hack to support koalas. This needs to be
538+
# thoroughly tested, right now koalas returns NA when a dtype value
539+
# can't be coerced into the target dtype.
540+
to_datetime_fn = pd.to_datetime
541+
if type(col).__module__.startswith(
542+
"databricks.koalas"
543+
): # pragma: no cover
544+
# pylint: disable=import-outside-toplevel
545+
import databricks.koalas as ks
546+
547+
to_datetime_fn = ks.to_datetime
548+
col = to_datetime_fn(col, **self.to_datetime_kwargs)
521549
return col.astype(self.type)
522550

523551
if isinstance(data_container, pd.DataFrame):

0 commit comments

Comments
 (0)