Skip to content

Commit 207e296

Browse files
HyukjinKwoncloud-fan
authored andcommitted
[SPARK-51789][CORE][FOLLOW-UP] Set the initial Spark Connect mode properly
### What changes were proposed in this pull request? This PR is a followup of #50575 that sets the initial Spark Connect mode by reading `SPARK_CONNECT_MODE`. ### Why are the changes needed? In Spark 4.0 release (Spark Connect distribution), ```bash SPARK_CONNECT_MODE=1 ./bin/spark-shell ``` starts Spark Classic shells. ### Does this PR introduce _any_ user-facing change? No to end users because the main change has not been released yet. ### How was this patch tested? Manually tested with some combinations below: ``` SPARK_CONNECT_MODE=1 ./bin/spark-shell SPARK_CONNECT_MODE=1 ./bin/spark-shell --master local SPARK_CONNECT_MODE=1 ./bin/pyspark --conf spark.api.mode=classic --master local SPARK_CONNECT_MODE=1 ./bin/pyspark --master local --conf spark.api.mode=connect SPARK_CONNECT_MODE=1 ./bin/pyspark --conf spark.api.mode=classic SPARK_CONNECT_MODE=1 ./bin/pyspark --conf spark.api.mode=connect SPARK_CONNECT_MODE=1 ./bin/pyspark SPARK_CONNECT_MODE=0 ./bin/spark-shell SPARK_CONNECT_MODE=0 ./bin/spark-shell --master local SPARK_CONNECT_MODE=0 ./bin/pyspark --conf spark.api.mode=classic --master local SPARK_CONNECT_MODE=0 ./bin/pyspark --master local --conf spark.api.mode=connect SPARK_CONNECT_MODE=0 ./bin/pyspark --conf spark.api.mode=classic SPARK_CONNECT_MODE=0 ./bin/pyspark --conf spark.api.mode=connect SPARK_CONNECT_MODE=0 ./bin/pyspark SPARK_CONNECT_MODE=1 ./bin/spark-shell --master "local[*]" --remote "local[*]" SPARK_CONNECT_MODE=1 ./bin/pyspark --conf spark.remote="local[*]" --conf spark.api.mode=connect --conf spark.master="local[*]" SPARK_CONNECT_MODE=1 ./bin/pyspark --master "local[*]" --remote "local[*]" SPARK_CONNECT_MODE=1 ./bin/pyspark --conf spark.remote="local[*]" --conf spark.api.mode=connect --master "local[*]" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50846 from HyukjinKwon/SPARK-51789-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent da2b449 commit 207e296

File tree

3 files changed

+9
-7
lines changed

3 files changed

+9
-7
lines changed

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
126126
boolean isExample = false;
127127
List<String> submitArgs = args;
128128
this.userArgs = Collections.emptyList();
129+
isRemote |= "connect".equalsIgnoreCase(getApiMode(conf));
129130

130131
if (args.size() > 0) {
131132
switch (args.get(0)) {
@@ -549,11 +550,15 @@ protected boolean handle(String opt, String value) {
549550
checkArgument(value != null, "Missing argument to %s", CONF);
550551
String[] setConf = value.split("=", 2);
551552
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
552-
conf.put(setConf[0], setConf[1]);
553553
// If both spark.remote and spark.mater are set, the error will be thrown later when
554554
// the application is started.
555-
isRemote |= conf.containsKey("spark.remote");
556-
isRemote |= "connect".equalsIgnoreCase(getApiMode(conf));
555+
if (setConf[0].equals("spark.remote")) {
556+
isRemote = true;
557+
} else if (setConf[0].equals(SparkLauncher.SPARK_API_MODE)) {
558+
// Respects if the API mode is explicitly set.
559+
isRemote = setConf[1].equalsIgnoreCase("connect");
560+
}
561+
conf.put(setConf[0], setConf[1]);
557562
}
558563
case CLASS -> {
559564
// The special classes require some special command line handling, since they allow

python/pyspark/java_gateway.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def launch_gateway(conf=None, popen_kwargs=None):
8383
os.unlink(conn_info_file)
8484

8585
env = dict(os.environ)
86+
env["SPARK_CONNECT_MODE"] = "0"
8687
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
8788

8889
# Launch the Java gateway.

python/pyspark/sql/connect/session.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,13 +1074,11 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
10741074
overwrite_conf["spark.connect.grpc.binding.port"] = "0"
10751075

10761076
origin_remote = os.environ.get("SPARK_REMOTE", None)
1077-
origin_connect_mode = os.environ.get("SPARK_CONNECT_MODE", None)
10781077
try:
10791078
# So SparkSubmit thinks no remote is set in order to
10801079
# start the regular PySpark session.
10811080
if origin_remote is not None:
10821081
del os.environ["SPARK_REMOTE"]
1083-
os.environ["SPARK_CONNECT_MODE"] = "0"
10841082

10851083
# The regular PySpark session is registered as an active session
10861084
# so would not be garbage-collected.
@@ -1098,8 +1096,6 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
10981096
finally:
10991097
if origin_remote is not None:
11001098
os.environ["SPARK_REMOTE"] = origin_remote
1101-
if origin_connect_mode is not None:
1102-
os.environ["SPARK_CONNECT_MODE"] = origin_connect_mode
11031099
else:
11041100
raise PySparkRuntimeError(
11051101
errorClass="SESSION_OR_CONTEXT_EXISTS",

0 commit comments

Comments
 (0)