Skip to content

Commit abb5593

Browse files
authored
feat(pyspark): add official support and ci testing with spark connect (#10187)
## Description of changes This PR adds testing for using the pyspark Ibis backend with spark-connect. The way this is done is running a Spark connect instance as a docker compose service, similar to our other client-server model backends. The primary bit of functionality that isn't tested is UDFs (which means JSON unwrapping is also not tested, because that's implemented as a UDF). These effectively require a clone of the Python environment on the server, and that seems out of scope for initial support of spark connect.
1 parent 8166717 commit abb5593

25 files changed

+581
-462
lines changed

.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ PGPASSWORD="postgres"
55
MYSQL_PWD="ibis"
66
MSSQL_SA_PASSWORD="1bis_Testing!"
77
DRUID_URL="druid://localhost:8082/druid/v2/sql"
8+
SPARK_CONFIG=./docker/spark-connect/conf.properties

.github/workflows/ibis-backends.yml

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,9 @@ jobs:
442442
- name: download backend data
443443
run: just download-data
444444

445-
- name: show docker compose version
446-
if: matrix.backend.services != null
447-
run: docker compose version
448-
449445
- name: start services
450446
if: matrix.backend.services != null
451-
run: docker compose up --wait ${{ join(matrix.backend.services, ' ') }}
447+
run: just up ${{ join(matrix.backend.services, ' ') }}
452448

453449
- name: install python
454450
uses: actions/setup-python@v5
@@ -600,7 +596,7 @@ jobs:
600596

601597
- name: start services
602598
if: matrix.backend.services != null
603-
run: docker compose up --wait ${{ join(matrix.backend.services, ' ') }}
599+
run: just up ${{ join(matrix.backend.services, ' ') }}
604600

605601
- name: install python
606602
uses: actions/setup-python@v5
@@ -653,7 +649,7 @@ jobs:
653649
run: docker compose logs
654650

655651
test_pyspark:
656-
name: PySpark ${{ matrix.pyspark-minor-version }} ubuntu-latest python-${{ matrix.python-version }}
652+
name: PySpark ${{ matrix.tag }} ${{ matrix.pyspark-minor-version }} ubuntu-latest python-${{ matrix.python-version }}
657653
runs-on: ubuntu-latest
658654
strategy:
659655
fail-fast: false
@@ -665,19 +661,29 @@ jobs:
665661
deps:
666662
- "'pandas@<2'"
667663
- "'numpy@<1.24'"
664+
tag: local
668665
- python-version: "3.11"
669666
pyspark-version: "3.5.2"
670667
pyspark-minor-version: "3.5"
671668
deps:
672669
- "'pandas@>2'"
673670
- "'numpy@>1.24'"
671+
tag: local
674672
- python-version: "3.12"
675673
pyspark-version: "3.5.2"
676674
pyspark-minor-version: "3.5"
677675
deps:
678676
- "'pandas@>2'"
679677
- "'numpy@>1.24'"
680678
- setuptools
679+
tag: local
680+
- python-version: "3.12"
681+
pyspark-version: "3.5.2"
682+
pyspark-minor-version: "3.5"
683+
deps:
684+
- setuptools
685+
tag: remote
686+
SPARK_REMOTE: "sc://localhost:15002"
681687
steps:
682688
- name: checkout
683689
uses: actions/checkout@v4
@@ -691,6 +697,10 @@ jobs:
691697
env:
692698
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
693699

700+
- name: start services
701+
if: matrix.tag == 'remote'
702+
run: just up spark-connect
703+
694704
- name: download backend data
695705
run: just download-data
696706

@@ -730,7 +740,14 @@ jobs:
730740
shell: bash
731741
run: just download-iceberg-jar ${{ matrix.pyspark-minor-version }}
732742

733-
- name: run tests
743+
- name: run spark connect tests
744+
if: matrix.tag == 'remote'
745+
run: just ci-check -m pyspark
746+
env:
747+
SPARK_REMOTE: ${{ matrix.SPARK_REMOTE }}
748+
749+
- name: run spark tests
750+
if: matrix.tag == 'local'
734751
run: just ci-check -m pyspark
735752

736753
- name: check that no untracked files were produced

compose.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,24 @@ services:
589589
networks:
590590
- risingwave
591591

592+
spark-connect:
593+
image: bitnami/spark:3.5.2
594+
ports:
595+
- 15002:15002
596+
command: /opt/bitnami/spark/sbin/start-connect-server.sh --name ibis_testing --packages org.apache.spark:spark-connect_2.12:3.5.2,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
597+
healthcheck:
598+
test:
599+
- CMD-SHELL
600+
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/15002; exit $$?;'
601+
interval: 5s
602+
retries: 6
603+
volumes:
604+
- spark-connect:/data
605+
- $PWD/docker/spark-connect/conf.properties:/opt/bitnami/spark/conf/spark-defaults.conf:ro
606+
# - $PWD/docker/spark-connect/log4j2.properties:/opt/bitnami/spark/conf/log4j2.properties:ro
607+
networks:
608+
- spark-connect
609+
592610
networks:
593611
impala:
594612
# docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive
@@ -606,6 +624,7 @@ networks:
606624
exasol:
607625
flink:
608626
risingwave:
627+
spark-connect:
609628

610629
volumes:
611630
clickhouse:
@@ -617,3 +636,4 @@ volumes:
617636
exasol:
618637
impala:
619638
risingwave:
639+
spark-connect:

docker/spark-connect/conf.properties

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
spark.driver.extraJavaOptions=-Duser.timezone=GMT
2+
spark.executor.extraJavaOptions=-Duser.timezone=GMT
3+
spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
4+
spark.sql.catalog.local.type=hadoop
5+
spark.sql.catalog.local.warehouse=warehouse
6+
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog
7+
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
8+
spark.sql.legacy.timeParserPolicy=LEGACY
9+
spark.sql.session.timeZone=UTC
10+
spark.sql.streaming.schemaInference=true
11+
spark.ui.enabled=false
12+
spark.ui.showConsoleProgress=false
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
# Set everything to be logged to the console
18+
rootLogger.level = error
19+
rootLogger.appenderRef.stdout.ref = console
20+
21+
# In the pattern layout configuration below, we specify an explicit `%ex` conversion
22+
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
23+
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
24+
# class packaging information. That extra information can sometimes add a substantial
25+
# performance overhead, so we disable it in our default logging config.
26+
# For more information, see SPARK-39361.
27+
appender.console.type = Console
28+
appender.console.name = console
29+
appender.console.target = SYSTEM_ERR
30+
appender.console.layout.type = PatternLayout
31+
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
32+
33+
# Set the default spark-shell/spark-sql log level to WARN. When running the
34+
# spark-shell/spark-sql, the log level for these classes is used to overwrite
35+
# the root logger's log level, so that the user can have different defaults
36+
# for the shell and regular Spark apps.
37+
logger.repl.name = org.apache.spark.repl.Main
38+
logger.repl.level = error
39+
40+
logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
41+
logger.thriftserver.level = error
42+
43+
# Settings to quiet third party logs that are too verbose
44+
logger.jetty1.name = org.sparkproject.jetty
45+
logger.jetty1.level = error
46+
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
47+
logger.jetty2.level = error
48+
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
49+
logger.replexprTyper.level = error
50+
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
51+
logger.replSparkILoopInterpreter.level = error
52+
logger.parquet1.name = org.apache.parquet
53+
logger.parquet1.level = error
54+
logger.parquet2.name = parquet
55+
logger.parquet2.level = error
56+
57+
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
58+
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
59+
logger.RetryingHMSHandler.level = fatal
60+
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
61+
logger.FunctionRegistry.level = error
62+
63+
# For deploying Spark ThriftServer
64+
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
65+
appender.console.filter.1.type = RegexFilter
66+
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
67+
appender.console.filter.1.onMatch = deny
68+
appender.console.filter.1.onMismatch = neutral

ibis/backends/pyspark/__init__.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,15 @@
3131
from ibis.util import deprecated
3232

3333
try:
34-
from pyspark.errors import AnalysisException, ParseException
34+
from pyspark.errors import ParseException
35+
from pyspark.errors.exceptions.connect import SparkConnectGrpcException
3536
except ImportError:
36-
from pyspark.sql.utils import AnalysisException, ParseException
37+
from pyspark.sql.utils import ParseException
38+
39+
# Use a dummy class for when spark connect is not available
40+
class SparkConnectGrpcException(Exception):
41+
pass
42+
3743

3844
if TYPE_CHECKING:
3945
from collections.abc import Mapping, Sequence
@@ -186,13 +192,6 @@ def do_connect(
186192
# Databricks Serverless compute only supports limited properties
187193
# and any attempt to set unsupported properties will result in an error.
188194
# https://docs.databricks.com/en/spark/conf.html
189-
try:
190-
from pyspark.errors.exceptions.connect import SparkConnectGrpcException
191-
except ImportError:
192-
# Use a dummy class for when spark connect is not available
193-
class SparkConnectGrpcException(Exception):
194-
pass
195-
196195
with contextlib.suppress(SparkConnectGrpcException):
197196
self._session.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
198197

@@ -456,7 +455,9 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
456455
df.createTempView(op.name)
457456

458457
def _finalize_memtable(self, name: str) -> None:
459-
self._session.catalog.dropTempView(name)
458+
"""No-op, otherwise a deadlock can occur when using Spark Connect."""
459+
if isinstance(session := self._session, pyspark.sql.SparkSession):
460+
session.catalog.dropTempView(name)
460461

461462
@contextlib.contextmanager
462463
def _safe_raw_sql(self, query: str) -> Any:
@@ -579,16 +580,20 @@ def get_schema(
579580

580581
table_loc = self._to_sqlglot_table((catalog, database))
581582
catalog, db = self._to_catalog_db_tuple(table_loc)
583+
session = self._session
582584
with self._active_catalog_database(catalog, db):
583585
try:
584-
df = self._session.table(table_name)
585-
except AnalysisException as e:
586-
if not self._session.catalog.tableExists(table_name):
586+
df = session.table(table_name)
587+
# this is intentionally included in the try block because when
588+
# using spark connect, the table-not-found exception coming
589+
# from the server will *NOT* be raised until the schema
590+
# property is accessed
591+
struct = PySparkType.to_ibis(df.schema)
592+
except Exception as e:
593+
if not session.catalog.tableExists(table_name):
587594
raise com.TableNotFound(table_name) from e
588595
raise
589596

590-
struct = PySparkType.to_ibis(df.schema)
591-
592597
return sch.Schema(struct)
593598

594599
def create_table(
@@ -752,7 +757,7 @@ def _create_cached_table(self, name, expr):
752757
query = self.compile(expr)
753758
t = self._session.sql(query).cache()
754759
assert t.is_cached
755-
t.createOrReplaceTempView(name)
760+
t.createTempView(name)
756761
# store the underlying spark dataframe so we can release memory when
757762
# asked to, instead of when the session ends
758763
self._cached_dataframes[name] = t
@@ -761,7 +766,6 @@ def _create_cached_table(self, name, expr):
761766
def _drop_cached_table(self, name):
762767
self._session.catalog.dropTempView(name)
763768
t = self._cached_dataframes.pop(name)
764-
assert t.is_cached
765769
t.unpersist()
766770
assert not t.is_cached
767771

0 commit comments

Comments
 (0)