Skip to content

Commit e2ca839

Browse files
committed
Merge remote-tracking branch 'origin/master' into arrowudf
2 parents 8ee878c + 7cc2896 commit e2ca839

File tree

80 files changed

+946
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+946
-166
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import com.google.common.annotations.VisibleForTesting;
2828
import io.netty.channel.Channel;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129

3230
import org.apache.spark.internal.SparkLogger;
3331
import org.apache.spark.internal.SparkLoggerFactory;
@@ -45,6 +43,7 @@
4543
import org.apache.spark.network.server.MessageHandler;
4644
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4745
import org.apache.spark.network.util.TransportFrameDecoder;
46+
import org.apache.spark.util.Pair;
4847

4948
/**
5049
* Handler that processes server responses, in response to requests issued from a
@@ -96,7 +95,7 @@ public void removeRpcRequest(long requestId) {
9695

9796
public void addStreamCallback(String streamId, StreamCallback callback) {
9897
updateTimeOfLastRequest();
99-
streamCallbacks.offer(ImmutablePair.of(streamId, callback));
98+
streamCallbacks.offer(Pair.of(streamId, callback));
10099
}
101100

102101
@VisibleForTesting
@@ -125,7 +124,7 @@ private void failOutstandingRequests(Throwable cause) {
125124
}
126125
for (Pair<String, StreamCallback> entry : streamCallbacks) {
127126
try {
128-
entry.getValue().onFailure(entry.getKey(), cause);
127+
entry.getRight().onFailure(entry.getLeft(), cause);
129128
} catch (Exception e) {
130129
logger.warn("StreamCallback.onFailure throws exception", e);
131130
}
@@ -236,7 +235,7 @@ public void handle(ResponseMessage message) throws Exception {
236235
} else if (message instanceof StreamResponse resp) {
237236
Pair<String, StreamCallback> entry = streamCallbacks.poll();
238237
if (entry != null) {
239-
StreamCallback callback = entry.getValue();
238+
StreamCallback callback = entry.getRight();
240239
if (resp.byteCount > 0) {
241240
StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
242241
this, resp.streamId, resp.byteCount, callback);
@@ -262,7 +261,7 @@ public void handle(ResponseMessage message) throws Exception {
262261
} else if (message instanceof StreamFailure resp) {
263262
Pair<String, StreamCallback> entry = streamCallbacks.poll();
264263
if (entry != null) {
265-
StreamCallback callback = entry.getValue();
264+
StreamCallback callback = entry.getRight();
266265
try {
267266
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
268267
} catch (IOException ioe) {

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626
import com.google.common.annotations.VisibleForTesting;
2727
import com.google.common.base.Preconditions;
2828
import io.netty.channel.Channel;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129

3230
import org.apache.spark.internal.SparkLogger;
3331
import org.apache.spark.internal.SparkLoggerFactory;
3432
import org.apache.spark.network.buffer.ManagedBuffer;
3533
import org.apache.spark.network.client.TransportClient;
34+
import org.apache.spark.util.Pair;
3635

3736
/**
3837
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are
@@ -127,7 +126,7 @@ public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
127126
"Stream id and chunk index should be specified.";
128127
long streamId = Long.valueOf(array[0]);
129128
int chunkIndex = Integer.valueOf(array[1]);
130-
return ImmutablePair.of(streamId, chunkIndex);
129+
return Pair.of(streamId, chunkIndex);
131130
}
132131

133132
@Override

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@
2727

2828
import static org.mockito.Mockito.*;
2929

30-
import org.apache.commons.lang3.tuple.ImmutablePair;
31-
import org.apache.commons.lang3.tuple.Pair;
3230
import org.apache.spark.network.buffer.ManagedBuffer;
3331
import org.apache.spark.network.client.TransportClient;
3432
import org.apache.spark.network.protocol.*;
3533
import org.apache.spark.network.server.ChunkFetchRequestHandler;
3634
import org.apache.spark.network.server.NoOpRpcHandler;
3735
import org.apache.spark.network.server.OneForOneStreamManager;
3836
import org.apache.spark.network.server.RpcHandler;
37+
import org.apache.spark.util.Pair;
3938

4039
public class ChunkFetchRequestHandlerSuite {
4140

@@ -54,7 +53,7 @@ public void handleChunkFetchRequest() throws Exception {
5453
.thenAnswer(invocationOnMock0 -> {
5554
Object response = invocationOnMock0.getArguments()[0];
5655
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
57-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
56+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
5857
return channelFuture;
5958
});
6059

common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import com.google.common.collect.Sets;
2828
import com.google.common.io.Files;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129
import org.junit.jupiter.api.AfterAll;
3230
import org.junit.jupiter.api.BeforeAll;
3331
import org.junit.jupiter.api.Test;
@@ -41,6 +39,7 @@
4139
import org.apache.spark.network.util.JavaUtils;
4240
import org.apache.spark.network.util.MapConfigProvider;
4341
import org.apache.spark.network.util.TransportConf;
42+
import org.apache.spark.util.Pair;
4443

4544
public class RpcIntegrationSuite {
4645
static TransportConf conf;
@@ -408,7 +407,7 @@ private Pair<Set<String>, Set<String>> checkErrorsContain(
408407
notFound.add(contain);
409408
}
410409
}
411-
return new ImmutablePair<>(remainingErrors, notFound);
410+
return new Pair<>(remainingErrors, notFound);
412411
}
413412

414413
private static class VerifyingStreamCallback implements StreamCallbackWithID {

common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import static org.junit.jupiter.api.Assertions.*;
2929
import static org.mockito.Mockito.*;
3030

31-
import org.apache.commons.lang3.tuple.ImmutablePair;
32-
import org.apache.commons.lang3.tuple.Pair;
3331
import org.apache.spark.network.buffer.ManagedBuffer;
3432
import org.apache.spark.network.client.RpcResponseCallback;
3533
import org.apache.spark.network.client.TransportClient;
@@ -39,6 +37,7 @@
3937
import org.apache.spark.network.server.RpcHandler;
4038
import org.apache.spark.network.server.StreamManager;
4139
import org.apache.spark.network.server.TransportRequestHandler;
40+
import org.apache.spark.util.Pair;
4241

4342
public class TransportRequestHandlerSuite {
4443

@@ -53,7 +52,7 @@ public void handleStreamRequest() throws Exception {
5352
.thenAnswer(invocationOnMock0 -> {
5453
Object response = invocationOnMock0.getArguments()[0];
5554
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
56-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
55+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
5756
return channelFuture;
5857
});
5958

@@ -145,7 +144,7 @@ public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
145144
when(channel.writeAndFlush(any())).thenAnswer(invocationOnMock0 -> {
146145
Object response = invocationOnMock0.getArguments()[0];
147146
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
148-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
147+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
149148
return channelFuture;
150149
});
151150

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import org.apache.commons.lang3.builder.ToStringBuilder;
3030
import org.apache.commons.lang3.builder.ToStringStyle;
31-
import org.apache.commons.lang3.tuple.Pair;
3231
import com.fasterxml.jackson.annotation.JsonCreator;
3332
import com.fasterxml.jackson.annotation.JsonProperty;
3433
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,6 +55,7 @@
5655
import org.apache.spark.network.util.JavaUtils;
5756
import org.apache.spark.network.util.NettyUtils;
5857
import org.apache.spark.network.util.TransportConf;
58+
import org.apache.spark.util.Pair;
5959

6060
/**
6161
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -400,7 +400,7 @@ public Map<String, String[]> getLocalDirs(String appId, Set<String> execIds) {
400400
}
401401
return Pair.of(exec, info.localDirs);
402402
})
403-
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
403+
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
404404
}
405405

406406
/**
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util;
19+
20+
/**
21+
* An immutable pair of values. Note that the fields are intentionally designed to be `getLeft` and
22+
* `getRight` instead of `left` and `right` in order to mitigate the migration burden
23+
* from `org.apache.commons.lang3.tuple.Pair`.
24+
*/
25+
public record Pair<L, R>(L getLeft, R getRight) {
26+
public static <L, R> Pair<L, R> of(L left, R right) {
27+
return new Pair<>(left, right);
28+
}
29+
}

common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,89 @@
1616
*/
1717
package org.apache.spark.util
1818

19+
import java.util.HexFormat
20+
import java.util.concurrent.atomic.AtomicBoolean
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.util.ArrayImplicits._
24+
1925
private[spark] trait SparkStringUtils {
26+
private final lazy val SPACE_DELIMITED_UPPERCASE_HEX =
27+
HexFormat.of().withDelimiter(" ").withUpperCase()
28+
29+
/**
30+
* Returns a pretty string of the byte array which prints each byte as a hex digit and add
31+
* spaces between them. For example, [1A C0].
32+
*/
33+
def getHexString(bytes: Array[Byte]): String = {
34+
s"[${SPACE_DELIMITED_UPPERCASE_HEX.formatHex(bytes)}]"
35+
}
36+
37+
def sideBySide(left: String, right: String): Seq[String] = {
38+
sideBySide(left.split("\n").toImmutableArraySeq, right.split("\n").toImmutableArraySeq)
39+
}
40+
41+
def sideBySide(left: Seq[String], right: Seq[String]): Seq[String] = {
42+
val maxLeftSize = left.map(_.length).max
43+
val leftPadded = left ++ Seq.fill(math.max(right.size - left.size, 0))("")
44+
val rightPadded = right ++ Seq.fill(math.max(left.size - right.size, 0))("")
45+
46+
leftPadded.zip(rightPadded).map { case (l, r) =>
47+
(if (l == r) " " else "!") + l + (" " * ((maxLeftSize - l.length) + 3)) + r
48+
}
49+
}
50+
2051
def stringToSeq(str: String): Seq[String] = {
2152
import org.apache.spark.util.ArrayImplicits._
2253
str.split(",").map(_.trim()).filter(_.nonEmpty).toImmutableArraySeq
2354
}
2455
}
2556

26-
private[spark] object SparkStringUtils extends SparkStringUtils
57+
private[spark] object SparkStringUtils extends SparkStringUtils with Logging {
58+
59+
/** Whether we have warned about plan string truncation yet. */
60+
private val truncationWarningPrinted = new AtomicBoolean(false)
61+
62+
/**
63+
* Format a sequence with semantics similar to calling .mkString(). Any elements beyond
64+
* `maxFields` will be dropped and replaced by a "... N more fields" placeholder.
65+
*
66+
* @return
67+
* the trimmed and formatted string.
68+
*/
69+
def truncatedString[T](
70+
seq: Seq[T],
71+
start: String,
72+
sep: String,
73+
end: String,
74+
maxFields: Int,
75+
customToString: Option[T => String] = None): String = {
76+
if (seq.length > maxFields) {
77+
if (truncationWarningPrinted.compareAndSet(false, true)) {
78+
logWarning(
79+
"Truncated the string representation of a plan since it was too large. This " +
80+
s"behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.")
81+
}
82+
val numFields = math.max(0, maxFields)
83+
val restNum = seq.length - numFields
84+
val ending = (if (numFields == 0) "" else sep) +
85+
(if (restNum == 0) "" else s"... $restNum more fields") + end
86+
if (customToString.isDefined) {
87+
seq.take(numFields).map(customToString.get).mkString(start, sep, ending)
88+
} else {
89+
seq.take(numFields).mkString(start, sep, ending)
90+
}
91+
} else {
92+
if (customToString.isDefined) {
93+
seq.map(customToString.get).mkString(start, sep, end)
94+
} else {
95+
seq.mkString(start, sep, end)
96+
}
97+
}
98+
}
99+
100+
/** Shorthand for calling truncatedString() without start or end strings. */
101+
def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
102+
truncatedString(seq, "", sep, "", maxFields)
103+
}
104+
}

common/utils/src/test/java/org/apache/spark/util/SparkLoggerSuiteBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.nio.file.Files;
2323
import java.util.List;
2424

25-
import org.apache.commons.lang3.tuple.Pair;
2625
import org.apache.logging.log4j.Level;
2726
import org.junit.jupiter.api.Test;
2827

dev/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@
183183
<module name="IllegalImport">
184184
<property name="illegalPkgs" value="org.apache.log4j" />
185185
<property name="illegalPkgs" value="org.apache.commons.lang" />
186+
<property name="illegalPkgs" value="org.apache.commons.lang3.tuple" />
186187
<property name="illegalClasses" value="org.apache.commons.lang3.JavaVersion" />
187188
</module>
188189
<module name="RegexpSinglelineJava">

0 commit comments

Comments
 (0)