Skip to content

Commit f502d66

Browse files
xi-dbhvanhovell
authored andcommitted
[SPARK-52450][CONNECT] Improve performance of schema deepcopy
### What changes were proposed in this pull request? In Spark Connect, `DataFrame.schema` returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses `df.schema` repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation. The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails. ``` from pyspark.sql.types import StructType, StructField, StringType def make_nested_struct(level, max_level, fields_per_level): if level == max_level - 1: return StructType( [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)]) else: return StructType( [StructField(f"s{level}_{i}", make_nested_struct(level + 1, max_level, fields_per_level), True) for i in range(fields_per_level)]) # Create a 4 level nested schema with in total 10,000 leaf fields schema = make_nested_struct(0, 4, 10) ``` The existing needs 21.9s to copy the schema for 100 times. ``` import copy timeit.timeit(lambda: copy.deepcopy(schema), number=100) # 21.9 ``` The updated approach only needs 2.0s to copy for 100 times: ``` from pyspark.serializers import CPickleSerializer cached_schema_serialized = CPickleSerializer().dumps(schema) timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100) # 2.0 ``` ### Why are the changes needed? It improves the performance when calling df.schema many times. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests and new tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51157 from xi-db/schema-deepcopy-improvement. Lead-authored-by: Xi Lyu <[email protected]> Co-authored-by: Xi Lyu <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent 509f936 commit f502d66

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

python/pyspark/sql/connect/dataframe.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
PySparkAttributeError,
2323
)
2424
from pyspark.resource import ResourceProfile
25+
from pyspark.sql.connect.logging import logger
2526
from pyspark.sql.connect.utils import check_dependencies
2627

2728
check_dependencies(__name__)
@@ -69,6 +70,7 @@
6970
PySparkRuntimeError,
7071
)
7172
from pyspark.util import PythonEvalType
73+
from pyspark.serializers import CPickleSerializer
7274
from pyspark.storagelevel import StorageLevel
7375
import pyspark.sql.connect.plan as plan
7476
from pyspark.sql.conversion import ArrowTableToRowsConversion
@@ -141,6 +143,7 @@ def __init__(
141143
# by __repr__ and _repr_html_ while eager evaluation opens.
142144
self._support_repr_html = False
143145
self._cached_schema: Optional[StructType] = None
146+
self._cached_schema_serialized: Optional[bytes] = None
144147
self._execution_info: Optional["ExecutionInfo"] = None
145148

146149
def __reduce__(self) -> Tuple:
@@ -1836,11 +1839,24 @@ def _schema(self) -> StructType:
18361839
if self._cached_schema is None:
18371840
query = self._plan.to_proto(self._session.client)
18381841
self._cached_schema = self._session.client.schema(query)
1842+
try:
1843+
self._cached_schema_serialized = CPickleSerializer().dumps(self._schema)
1844+
except Exception as e:
1845+
logger.warn(f"DataFrame schema pickle dumps failed with exception: {e}.")
1846+
self._cached_schema_serialized = None
18391847
return self._cached_schema
18401848

18411849
@property
18421850
def schema(self) -> StructType:
1843-
return copy.deepcopy(self._schema)
1851+
# self._schema call will cache the schema and serialize it if it is not cached yet.
1852+
_schema = self._schema
1853+
if self._cached_schema_serialized is not None:
1854+
try:
1855+
return CPickleSerializer().loads(self._cached_schema_serialized)
1856+
except Exception as e:
1857+
logger.warn(f"DataFrame schema pickle loads failed with exception: {e}.")
1858+
# In case of pickle ser/de failure, fallback to deepcopy approach.
1859+
return copy.deepcopy(_schema)
18441860

18451861
@functools.cache
18461862
def isLocal(self) -> bool:

python/pyspark/sql/tests/connect/test_connect_dataframe_property.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ def test_cached_property_is_copied(self):
7272
df_columns.remove(col)
7373
assert len(df.columns) == 4
7474

75+
cdf = self.connect.createDataFrame(data, schema)
76+
cdf_schema = cdf.schema
77+
assert len(cdf._cached_schema_serialized) > 0
78+
assert cdf_schema.jsonValue() == cdf._cached_schema.jsonValue()
79+
assert len(cdf_schema.fields) == 4
80+
cdf_schema.fields.pop(0)
81+
assert cdf.schema.jsonValue() == cdf._cached_schema.jsonValue()
82+
assert len(cdf.schema.fields) == 4
83+
7584
def test_cached_schema_to(self):
7685
rows = [Row(id=x, name=str(x)) for x in range(100)]
7786
cdf = self.connect.createDataFrame(rows)

0 commit comments

Comments
 (0)