-
-
Notifications
You must be signed in to change notification settings - Fork 339
Design Data Types Library That Supports Both PySpark & Pandas #1360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Any comments on this? |
Long story short, the primitives are there, it'll just be some work before we can realize the vision of "one DataFrameModel to rule them all" 💍. I see this question can be broken down into two sub-problems:
Agreed, but you might be surprised how challenging this is to get right 🙃. I'd say (2) is a little easier to tackle right now. Basically we'd need to add the library-agnostic data types as supported equivalent types in the As for (1), that's going to take more work, but basically we'd need to create a generic @NeerajMalhotra-QB @jaskaransinghsidana FYI. I guess to kick this effort off, would you mind sharing some example code of what you're doing today @lior5654 ? |
Thanks for the detailed answer! I really appreciate it. I'll share a minimal PoC soon, but another question arises - do you recommend other libraries to do this these days? Are you aware of any libraries that currently support such concepts? @cosmicBboy |
@lior5654 as far as I know I don't know of efforts to create a "unified dataframe model for schema validation"... The "one DataFrameModel to rule them all" really is the goal here, but for the longest time pandera only supported Now with the efforts to support polars and ibis I think we're in a good position to generalize the API so we can have a generic |
Thanks again for the reply. before I send code samples, I have another question. |
But I think my intentions are pretty clear, here's a code sample: import pyspark.pandas as ps
import pandas as pd
import pandera as pa
# should GENERIC, for ALL pandas compliant APIS
# in the future - also for non pandas compliat APIS (only when applicable)
# currently - this is for pandas itself only
from pandera.typing import DataFrame, Series
class Schema(pa.DataFrameModel):
state: Series[str]
city: Series[str]
price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})
# validate a pd.DataFrame (this will work)
# validate a ps.DataFrame (this has it's own Series type so you need to define the same class with a different Series
# ... I guess for a start, supporting all pandas-compliant APIs with a single class should be easy, right? |
Oh I just tested, and it seems like the pandera pandas DataFrame model works seamlessly with pyspark.pandas API, great to know! |
Yep! The pyspark.pandas integration has been around for longer |
Yes, this is possible today with the backend extensions plugin. This is currently done for dask, modin, and pyspark.pandas. The challenge is making |
Is there any update now that polars (#1064) is also supported, @cosmicBboy? Poking around the API and pandera's source code, it seems like I'm already very close to being able to define a functional shared Here is a POC that seems to work: Albeit I can't validate directly with the shared model. I guess it should be straight forward to enable validation with the shared model by having import pandas as pd
import polars as pl
# Pandera pandas
import pandera as pd_pa
# Pandera polars
import pandera.polars as pl_pa
# Pandera library-agnostic
from pandera.api.dataframe.model import DataFrameModel
from pandera.api.dataframe.model_components import Field
from pandera.dtypes import String, Int64
from pandera.errors import SchemaErrors
class SharedModel(DataFrameModel):
a_str: String = Field(isin=["ok"])
an_int: Int64 = Field(gt=0)
class PandasModel(SharedModel, pd_pa.DataFrameModel):
...
class PolarsModel(SharedModel, pl_pa.DataFrameModel):
...
def main():
# define a pandas and a polars dataframe with the same content
pa_df = pd.DataFrame({
"a_str": [
"ok",
"not ok", # should raise
],
"an_int": [
-1, # should raise
1,
]
})
pl_df = pl.from_pandas(pa_df)
print("pandas dataframe / model works")
try:
PandasModel.validate(pa_df, lazy=True)
assert False
except SchemaErrors as e:
print(e)
print("polars dataframe / model works")
try:
PolarsModel.validate(pl_df, lazy=True)
assert False
except SchemaErrors as e:
print(e)
print("pandas / polars dataframe with shared model doesn't work: raises NotImplementedError")
for df in (pa_df, pl_df):
try:
SharedModel.validate(df, lazy=True)
assert False
except NotImplementedError:
...
if __name__ == "__main__":
main() Which outputs:
|
Currently using the following code to have both of them under the same schema. import typing
from dataclasses import dataclass
from functools import wraps
from typing import Any, Dict, List, Optional
import pandas as pd
import pandera as pa
import pandera.pyspark as psa
import pyspark.sql as ps
import pyspark.sql.types as T
from pandera.decorators import _handle_schema_error
@dataclass
class Column:
"""Data class to represent a class agnostic Pandera Column."""
type_: Any
checks: Optional[List] = None
nullable: bool = True
@dataclass
class DataFrameSchema:
"""Data class to represent a class agnostic Pandera DataFrameSchema."""
columns: Dict[str, Column]
unique: Optional[List] = None
def build_for_type(self, type_) -> psa:
# Build pandas version
if type_ is pd.DataFrame:
return pa.DataFrameSchema(
columns={
name: pa.Column(col.type_, checks=col.checks, nullable=col.nullable)
for name, col in self.columns.items()
},
unique=self.unique,
)
# Build pyspark version
if type_ is ps.DataFrame:
return psa.DataFrameSchema(
columns={
name: psa.Column(col.type_, checks=col.checks, nullable=col.nullable)
for name, col in self.columns.items()
},
unique=self.unique,
)
raise TypeError()
def check_output(schema: DataFrameSchema):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Extract return type of function
if not (type_ := typing.get_type_hints(func).get("return")):
raise RuntimeError(f"No output typehint specified!")
# Build validator
df_schema: psa.DataFrameSchema = schema.build_for_type(type_)
# Invoke function
df = func(*args, **kwargs)
# Run validation
try:
df_schema.validate(df, lazy=False)
except pa.errors.SchemaError as e:
_handle_schema_error("check_output", func, df_schema, df, e)
return df
return wrapper
return decorator
@check_output(DataFrameSchema(columns={"id": Column(int)}, unique=["id"]))
def dummy_pandas_fn() -> pd.DataFrame:
return pd.DataFrame({"id": [1]})
@check_output(DataFrameSchema(columns={"bucket": Column(int, nullable=False)}, unique=["bucket"]))
def dummy_spark_fn(num_buckets: int = 10) -> ps.DataFrame:
# Construct df to bucketize
spark_session: ps.SparkSession = ps.SparkSession.builder.getOrCreate()
# Bucketize df
return spark_session.createDataFrame(
data=[(1,) for bucket in range(num_buckets)],
schema=T.StructType([T.StructField("bucket", T.IntegerType())]),
)
# Invoke the fn
dummy_spark_fn()
dummy_pandas_fn() |
Design Data Types Library That Supports Both PySpark & Pandas
Hi, I have multiple data types I commonly work with, sometimes in pandas and sometimes in pyspark.
I don't want to create 2 pandera DataFrameModels for each type, that seems like a really bad practice.
What's the best way to currently do this?
Is there also a way to write code that will work both on pyspark & pandas?
The text was updated successfully, but these errors were encountered: