Skip to content

Commit c31164a

Browse files
authored
[3.3] Add http2 client connection preface process (#15436)
* Add http2 client connection preface process * Add comments to explain the reason of adding client connection preface processing
1 parent 6939e3c commit c31164a

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,45 @@
1717
package org.apache.dubbo.remoting.transport.netty4;
1818

1919
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.common.Version;
21+
import org.apache.dubbo.common.utils.NetUtils;
2022
import org.apache.dubbo.remoting.ChannelHandler;
2123
import org.apache.dubbo.remoting.Constants;
2224
import org.apache.dubbo.remoting.RemotingException;
2325
import org.apache.dubbo.remoting.api.WireProtocol;
26+
import org.apache.dubbo.remoting.transport.netty4.http2.Http2ClientSettingsHandler;
2427
import org.apache.dubbo.remoting.transport.netty4.ssl.SslClientTlsHandler;
2528
import org.apache.dubbo.remoting.transport.netty4.ssl.SslContexts;
2629
import org.apache.dubbo.remoting.utils.UrlUtils;
2730

31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
2834
import io.netty.bootstrap.Bootstrap;
2935
import io.netty.buffer.PooledByteBufAllocator;
3036
import io.netty.channel.ChannelFuture;
37+
import io.netty.channel.ChannelHandlerContext;
3138
import io.netty.channel.ChannelInitializer;
3239
import io.netty.channel.ChannelOption;
3340
import io.netty.channel.ChannelPipeline;
3441
import io.netty.channel.socket.SocketChannel;
42+
import io.netty.handler.codec.http2.Http2FrameCodec;
3543
import io.netty.handler.ssl.SslContext;
3644
import io.netty.handler.timeout.IdleStateHandler;
45+
import io.netty.util.concurrent.DefaultPromise;
46+
import io.netty.util.concurrent.GlobalEventExecutor;
47+
import io.netty.util.concurrent.Promise;
3748

3849
import static java.util.concurrent.TimeUnit.MILLISECONDS;
50+
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
3951
import static org.apache.dubbo.remoting.transport.netty4.NettyEventLoopFactory.socketChannelClass;
4052

4153
public final class NettyConnectionClient extends AbstractNettyConnectionClient {
4254

4355
private Bootstrap bootstrap;
4456

57+
private AtomicReference<Promise<Void>> connectionPrefaceReceivedPromiseRef;
58+
4559
public NettyConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
4660
super(url, handler);
4761
}
@@ -87,6 +101,16 @@ protected void initChannel(SocketChannel ch) {
87101

88102
NettyConfigOperator operator = new NettyConfigOperator(nettyChannel, getChannelHandler());
89103
protocol.configClientPipeline(getUrl(), operator, nettySslContextOperator);
104+
105+
ChannelHandlerContext http2FrameCodecHandlerCtx = pipeline.context(Http2FrameCodec.class);
106+
if (http2FrameCodecHandlerCtx != null) {
107+
connectionPrefaceReceivedPromiseRef = new AtomicReference<>();
108+
pipeline.addAfter(
109+
http2FrameCodecHandlerCtx.name(),
110+
"client-connection-preface-handler",
111+
new Http2ClientSettingsHandler(connectionPrefaceReceivedPromiseRef));
112+
}
113+
90114
// set null but do not close this client, it will be reconnecting in the future
91115
ch.closeFuture().addListener(channelFuture -> clearNettyChannel());
92116
// TODO support Socks5
@@ -97,6 +121,71 @@ protected void initChannel(SocketChannel ch) {
97121

98122
@Override
99123
protected ChannelFuture performConnect() {
124+
if (connectionPrefaceReceivedPromiseRef != null) {
125+
connectionPrefaceReceivedPromiseRef.compareAndSet(null, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
126+
}
100127
return bootstrap.connect();
101128
}
129+
130+
@Override
131+
protected void doConnect() throws RemotingException {
132+
long start = System.currentTimeMillis();
133+
super.doConnect();
134+
waitConnectionPreface(start);
135+
}
136+
137+
/**
138+
* Wait connection preface
139+
* <br>
140+
* Http2 client should set max header list size of http2 encoder based on server connection preface before
141+
* sending first data frame, otherwise the http2 server might send back GO_AWAY frame and disconnect the connection
142+
* immediately if the size of client Headers frame is bigger than the MAX_HEADER_LIST_SIZE of server settings.<br>
143+
* @see <a href="https://httpwg.org/specs/rfc7540.html#ConnectionHeader">HTTP/2 Connection Preface</a><br>
144+
* In HTTP/2, each endpoint is required to send a connection preface as a final confirmation of the protocol
145+
* in use and to establish the initial settings for the HTTP/2 connection. The client and server each send a
146+
* different connection preface. The client connection preface starts with a sequence of 24 octets,
147+
* which in hex notation is:<br>
148+
* 0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a<br>
149+
* That is, the connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n<br>
150+
* This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty.
151+
* The server connection preface consists of a potentially empty SETTINGS frame (Section 6.5) that MUST be
152+
* the first frame the server sends in the HTTP/2 connection.
153+
*
154+
* @param start start time of doConnect in milliseconds.
155+
*/
156+
private void waitConnectionPreface(long start) throws RemotingException {
157+
if (connectionPrefaceReceivedPromiseRef == null) {
158+
return;
159+
}
160+
Promise<Void> connectionPrefaceReceivedPromise = connectionPrefaceReceivedPromiseRef.get();
161+
if (connectionPrefaceReceivedPromise != null) {
162+
long retainedTimeout = getConnectTimeout() - System.currentTimeMillis() + start;
163+
boolean ret = connectionPrefaceReceivedPromise.awaitUninterruptibly(retainedTimeout, TimeUnit.MILLISECONDS);
164+
// Only process once: destroy connectionPrefaceReceivedPromise after used
165+
synchronized (this) {
166+
connectionPrefaceReceivedPromiseRef.set(null);
167+
}
168+
if (!ret || !connectionPrefaceReceivedPromise.isSuccess()) {
169+
// 6-2 Client-side connection preface timeout
170+
RemotingException remotingException = new RemotingException(
171+
this,
172+
"client(url: " + getUrl() + ") failed to connect to server " + getConnectAddress()
173+
+ " client-side connection preface timeout " + getConnectTimeout()
174+
+ "ms (elapsed: "
175+
+ (System.currentTimeMillis() - start) + "ms) from netty client "
176+
+ NetUtils.getLocalHost()
177+
+ " using dubbo version "
178+
+ Version.getVersion());
179+
180+
logger.error(
181+
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
182+
"provider crash",
183+
"",
184+
"Client-side connection preface timeout",
185+
remotingException);
186+
187+
throw remotingException;
188+
}
189+
}
190+
}
102191
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.remoting.transport.netty4.http2;
18+
19+
import org.apache.dubbo.common.logger.Logger;
20+
import org.apache.dubbo.common.logger.LoggerFactory;
21+
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import io.netty.channel.ChannelHandlerContext;
25+
import io.netty.channel.SimpleChannelInboundHandler;
26+
import io.netty.handler.codec.http2.Http2SettingsFrame;
27+
import io.netty.util.concurrent.Promise;
28+
29+
public class Http2ClientSettingsHandler extends SimpleChannelInboundHandler<Http2SettingsFrame> {
30+
31+
private static final Logger logger = LoggerFactory.getLogger(Http2ClientSettingsHandler.class);
32+
33+
private final AtomicReference<Promise<Void>> connectionPrefaceReceivedPromiseRef;
34+
35+
public Http2ClientSettingsHandler(AtomicReference<Promise<Void>> connectionPrefaceReceivedPromiseRef) {
36+
this.connectionPrefaceReceivedPromiseRef = connectionPrefaceReceivedPromiseRef;
37+
}
38+
39+
@Override
40+
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) throws Exception {
41+
if (logger.isDebugEnabled()) {
42+
logger.debug("Receive Http2 Settings frame of " + ctx.channel().localAddress() + " -> "
43+
+ ctx.channel().remoteAddress());
44+
}
45+
// connectionPrefaceReceivedPromise will be set null after first used.
46+
Promise<Void> connectionPrefaceReceivedPromise = connectionPrefaceReceivedPromiseRef.get();
47+
if (connectionPrefaceReceivedPromise == null) {
48+
ctx.fireChannelRead(msg);
49+
} else {
50+
// Notify the connection preface is received when first inbound http2 settings frame is arrived.
51+
connectionPrefaceReceivedPromise.trySuccess(null);
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)