Skip to content

Commit e396bde

Browse files
authored
Add connection preface process to TripleHttp2Protocol (apache#15368)
1 parent 6f110c1 commit e396bde

File tree

6 files changed

+128
-0
lines changed

6 files changed

+128
-0
lines changed

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,8 @@ public interface WireProtocol {
3232
void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator);
3333

3434
void close();
35+
36+
default boolean hasConnectionPreface() {
37+
return false;
38+
}
3539
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/ConnectionHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,11 @@ public interface ConnectionHandler {
3131
* @param channel Channel
3232
*/
3333
void reconnect(Object channel);
34+
35+
/**
36+
* when connection preface is received.
37+
*
38+
* @param channel Channel
39+
*/
40+
void onConnectionPrefaceReceived(Object channel);
3441
}

dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public abstract class AbstractNettyConnectionClient extends AbstractConnectionCl
5757

5858
private ConnectionListener connectionListener;
5959

60+
private AtomicReference<Promise<Void>> connectionPrefaceReceivedPromiseRef;
61+
6062
public static final AttributeKey<AbstractConnectionClient> CONNECTION = AttributeKey.valueOf("connection");
6163

6264
public AbstractNettyConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
@@ -81,6 +83,12 @@ protected void initConnectionClient() {
8183
isReconnecting = new AtomicBoolean(false);
8284
connectionListener = new ConnectionListener();
8385
increase();
86+
87+
// Create connection preface received promise for Http2 client since it should wait server http2 settings frame
88+
// before sending client Headers frame, see details at https://github.com/apache/dubbo/issues/15233
89+
if (protocol != null && protocol.hasConnectionPreface()) {
90+
connectionPrefaceReceivedPromiseRef = new AtomicReference<>();
91+
}
8492
}
8593

8694
protected abstract void initBootstrap() throws Exception;
@@ -129,6 +137,11 @@ protected void doConnect() throws RemotingException {
129137
Future<Void> connectPromise = performConnect();
130138
connectPromise.addListener(connectionListener);
131139

140+
Promise<Void> connectionPrefaceReceivedPromise = null;
141+
if (connectionPrefaceReceivedPromiseRef != null) {
142+
connectionPrefaceReceivedPromise = getOrCreateConnectionPrefaceReceivedPromise();
143+
}
144+
132145
boolean ret = connectingPromise.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
133146
// destroy connectingPromise after used
134147
synchronized (this) {
@@ -168,6 +181,35 @@ protected void doConnect() throws RemotingException {
168181

169182
throw remotingException;
170183
}
184+
185+
if (connectionPrefaceReceivedPromise != null) {
186+
long retainedTimeout = getConnectTimeout() - System.currentTimeMillis() + start;
187+
ret = connectionPrefaceReceivedPromise.awaitUninterruptibly(retainedTimeout, TimeUnit.MILLISECONDS);
188+
// destroy connectionPrefaceReceivedPromise after used
189+
synchronized (this) {
190+
connectionPrefaceReceivedPromiseRef.set(null);
191+
}
192+
if (!ret || !connectionPrefaceReceivedPromise.isSuccess()) {
193+
// 6-2 Client-side connection preface timeout
194+
RemotingException remotingException = new RemotingException(
195+
this,
196+
"client(url: " + getUrl() + ") failed to connect to server " + getConnectAddress()
197+
+ " client-side connection preface timeout " + getConnectTimeout() + "ms (elapsed: "
198+
+ (System.currentTimeMillis() - start) + "ms) from netty client "
199+
+ NetUtils.getLocalHost()
200+
+ " using dubbo version "
201+
+ Version.getVersion());
202+
203+
logger.error(
204+
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
205+
"provider crash",
206+
"",
207+
"Client-side connection preface timeout",
208+
remotingException);
209+
210+
throw remotingException;
211+
}
212+
}
171213
}
172214

173215
protected abstract ChannelFuture performConnect();
@@ -246,6 +288,16 @@ public void onGoaway(Object channel) {
246288
}
247289
}
248290

291+
public void onConnectionPrefaceReceived(Object channel) {
292+
if (!(channel instanceof io.netty.channel.Channel) || connectionPrefaceReceivedPromiseRef == null) {
293+
return;
294+
}
295+
Promise<Void> connectionPrefaceReceivedPromise = connectionPrefaceReceivedPromiseRef.get();
296+
if (connectionPrefaceReceivedPromise != null) {
297+
connectionPrefaceReceivedPromise.trySuccess(null);
298+
}
299+
}
300+
249301
@Override
250302
protected Channel getChannel() {
251303
io.netty.channel.Channel c = getNettyChannel();
@@ -302,6 +354,11 @@ private Promise<Object> getOrCreateConnectingPromise() {
302354
return connectingPromiseRef.get();
303355
}
304356

357+
private Promise<Void> getOrCreateConnectionPrefaceReceivedPromise() {
358+
connectionPrefaceReceivedPromiseRef.compareAndSet(null, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
359+
return connectionPrefaceReceivedPromiseRef.get();
360+
}
361+
305362
public Promise<Void> getClosePromise() {
306363
return closePromise;
307364
}

dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ public void reconnect(Object channel) {
8383
eventLoop.schedule(connectionClient::doReconnect, 1, TimeUnit.SECONDS);
8484
}
8585

86+
@Override
87+
public void onConnectionPrefaceReceived(Object channel) {
88+
if (!(channel instanceof Channel)) {
89+
return;
90+
}
91+
connectionClient.onConnectionPrefaceReceived(channel);
92+
}
93+
8694
@Override
8795
public void channelActive(ChannelHandlerContext ctx) {
8896
ctx.fireChannelActive();

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory;
4242
import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
4343
import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler;
44+
import org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2SettingsHandler;
4445
import org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
4546
import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
4647
import org.apache.dubbo.rpc.protocol.tri.websocket.DefaultWebSocketServerTransportListenerFactory;
@@ -95,6 +96,11 @@ public void close() {
9596
super.close();
9697
}
9798

99+
@Override
100+
public boolean hasConnectionPreface() {
101+
return true;
102+
}
103+
98104
@Override
99105
public void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator) {
100106
TripleConfig tripleConfig = ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
@@ -116,6 +122,7 @@ public void configClientPipeline(URL url, ChannelOperator operator, ContextOpera
116122
handlers.add(new ChannelHandlerPretender(new Http2MultiplexHandler(new ChannelDuplexHandler())));
117123
handlers.add(new ChannelHandlerPretender(new TriplePingPongHandler(UrlUtils.getCloseTimeout(url))));
118124
handlers.add(new ChannelHandlerPretender(new TripleGoAwayHandler()));
125+
handlers.add(new ChannelHandlerPretender(new TripleHttp2SettingsHandler()));
119126
handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
120127
operator.configChannelHandler(handlers);
121128
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.tri.transport;
18+
19+
import org.apache.dubbo.common.logger.Logger;
20+
import org.apache.dubbo.common.logger.LoggerFactory;
21+
import org.apache.dubbo.remoting.Constants;
22+
import org.apache.dubbo.remoting.api.connection.ConnectionHandler;
23+
24+
import io.netty.channel.ChannelHandlerContext;
25+
import io.netty.channel.SimpleChannelInboundHandler;
26+
import io.netty.handler.codec.http2.Http2SettingsFrame;
27+
28+
public class TripleHttp2SettingsHandler extends SimpleChannelInboundHandler<Http2SettingsFrame> {
29+
30+
private static final Logger logger = LoggerFactory.getLogger(TripleHttp2SettingsHandler.class);
31+
32+
public TripleHttp2SettingsHandler() {}
33+
34+
@Override
35+
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) throws Exception {
36+
final ConnectionHandler connectionHandler =
37+
(ConnectionHandler) ctx.pipeline().get(Constants.CONNECTION_HANDLER_NAME);
38+
if (logger.isDebugEnabled()) {
39+
logger.debug("Receive Http2 Settings frame of " + ctx.channel().localAddress() + " -> "
40+
+ ctx.channel().remoteAddress());
41+
}
42+
// Notify the connection preface is received.
43+
connectionHandler.onConnectionPrefaceReceived(ctx.channel());
44+
}
45+
}

0 commit comments

Comments
 (0)