Skip to content

netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR #4217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
return new BufferingHttp2UpgradeHandler(upgrader);
return new BufferingHttp2UpgradeHandler(upgrader, handler);
}
}

Expand Down Expand Up @@ -662,8 +662,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {

BufferUntilChannelActiveHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler handler;

BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
super(handler);
this.handler = handler;
}

@Override
Expand All @@ -679,6 +682,11 @@ public void handlerAdded(ChannelHandlerContext ctx) {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writeBufferedAndRemove(ctx);
handler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
super.channelActive(ctx);
}
}
Expand All @@ -689,8 +697,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {

BufferingHttp2UpgradeHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler grpcHandler;

BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
super(handler);
this.grpcHandler = grpcHandler;
}

@Override
Expand All @@ -712,6 +723,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
grpcHandler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}
Expand Down
8 changes: 7 additions & 1 deletion okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void setAuthority(String authority) {

@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
return transportState().getAttributes();
}

class Sink implements AbstractClientStream.Sink {
Expand Down Expand Up @@ -318,6 +318,12 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
}

// Safe to call without lock
@SuppressWarnings("GuardedBy")
public Attributes getAttributes() {
return transport.getAttributes();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get the attributes and save it as a field at TransportState constructor, the transport is guaranteed in READY state when the constructor is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This required updating some unit tests to ensure streams are not started until the transport is ready.


@GuardedBy("lock")
private void onEndOfStream() {
if (!framer().isClosed()) {
Expand Down
16 changes: 14 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.squareup.okhttp.internal.http.StatusLine;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
Expand Down Expand Up @@ -149,6 +150,8 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
// Caution: Not synchronized, new value can only be safely read after the connection is complete.
private Attributes attributes = Attributes.EMPTY;
/**
* Indicates the transport is in go-away state: no new streams will be processed, but existing
* streams may continue.
Expand Down Expand Up @@ -451,6 +454,7 @@ public void close() {}
Variant variant = new Http2();
BufferedSink sink;
Socket sock;
Attributes attrs;
try {
if (proxy == null) {
sock = new Socket(address.getAddress(), address.getPort());
Expand All @@ -467,6 +471,10 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
sock.setTcpNoDelay(true);
source = Okio.buffer(Okio.source(sock));
sink = Okio.buffer(Okio.sink(sock));
attrs = Attributes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set attributes field directly here. Otherwise LGTM.

.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
.build();
} catch (StatusException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
return;
Expand All @@ -482,6 +490,11 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
synchronized (lock) {
socket = sock;
maxConcurrentStreams = Integer.MAX_VALUE;

// TODO(zhangkun83): fill channel security attributes
// The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
attributes = attrs;

startPendingStreams();
}

Expand Down Expand Up @@ -675,8 +688,7 @@ public void shutdownNow(Status reason) {

@Override
public Attributes getAttributes() {
// TODO(zhangkun83): fill channel security attributes
return Attributes.EMPTY;
return attributes;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -1709,9 +1707,7 @@ public void socketStats_addresses() throws Exception {
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;

// clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress
SocketAddress serverAddress
= new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort());
SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);

SocketStats clientSocketStats = client.getStats().get();
Expand Down