Skip to content

netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR #4217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword);
}
return new BufferUntilProxyTunnelledHandler(
proxyHandler, negotiator.newHandler(http2Handler));
proxyHandler, negotiator.newHandler(http2Handler), http2Handler);
}
}

Expand All @@ -212,11 +212,15 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {
private final ProtocolNegotiator.Handler originalHandler;
private final GrpcHttp2ConnectionHandler grpcHandler;

public BufferUntilProxyTunnelledHandler(
ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) {
ProxyHandler proxyHandler,
ProtocolNegotiator.Handler handler,
GrpcHttp2ConnectionHandler grpcHandler) {
super(proxyHandler, handler);
this.originalHandler = handler;
this.grpcHandler = grpcHandler;
}


Expand All @@ -229,6 +233,11 @@ public AsciiString scheme() {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ProxyConnectionEvent) {
writeBufferedAndRemove(ctx);
grpcHandler.handleProtocolNegotiationCompleted(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build()
);
}
super.userEventTriggered(ctx, evt);
}
Expand Down Expand Up @@ -329,7 +338,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
return new BufferingHttp2UpgradeHandler(upgrader);
return new BufferingHttp2UpgradeHandler(upgrader, handler);
}
}

Expand Down Expand Up @@ -662,8 +671,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {

BufferUntilChannelActiveHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler handler;

BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
super(handler);
this.handler = handler;
}

@Override
Expand All @@ -679,6 +691,11 @@ public void handlerAdded(ChannelHandlerContext ctx) {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writeBufferedAndRemove(ctx);
handler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
super.channelActive(ctx);
}
}
Expand All @@ -689,8 +706,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {

BufferingHttp2UpgradeHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler grpcHandler;

BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
super(handler);
this.grpcHandler = grpcHandler;
}

@Override
Expand All @@ -712,6 +732,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
grpcHandler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}
Expand Down
8 changes: 7 additions & 1 deletion okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void setAuthority(String authority) {

@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
return transportState().getAttributes();
}

class Sink implements AbstractClientStream.Sink {
Expand Down Expand Up @@ -318,6 +318,12 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
}

public Attributes getAttributes() {
synchronized (lock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock is not needed if attributes is volatile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is required to make the @GuardedBy around the transport happy.

Copy link
Member

@dapengzhang0 dapengzhang0 Mar 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe you can cache the attribute in TransportState constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the time TransportState is constructed, start may not have been called yet. Copying the attributes to a volatile at start or stream allocated will work, but adds more things for us to keep track of. I opted to suppress the warning, PTAL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportState constructor is invoked by OkHttpClientStream constructor which in turn is invoked by OkHttpClientTransport.newStream(). The newStream() method on real transport won't be called until the real transport is ready (After onTransportReady() called).

  // InternalSubchannel.java
  /**
   * The transport for new outgoing requests. 'lock' must be held when assigning to it. Non-null
   * only in READY state.
   */
  @Nullable
  private volatile ManagedClientTransport activeTransport;

return transport.getAttributes();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get the attributes and save it as a field at TransportState constructor, the transport is guaranteed in READY state when the constructor is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This required updating some unit tests to ensure streams are not started until the transport is ready.


@GuardedBy("lock")
private void onEndOfStream() {
if (!framer().isClosed()) {
Expand Down
12 changes: 10 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.squareup.okhttp.internal.http.StatusLine;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
Expand Down Expand Up @@ -149,6 +150,7 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
private Attributes attributes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • maybe volatile
  • maybe init with EMPTY

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this needs to be volatile and set while holding the lock.

/**
* Indicates the transport is in go-away state: no new streams will be processed, but existing
* streams may continue.
Expand Down Expand Up @@ -485,6 +487,13 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
startPendingStreams();
}

// TODO(zhangkun83): fill channel security attributes
// The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
attributes = Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, socket.getRemoteSocketAddress())
.build();

rawFrameWriter = variant.newWriter(sink, true);
frameWriter.becomeConnected(rawFrameWriter, socket);

Expand Down Expand Up @@ -675,8 +684,7 @@ public void shutdownNow(Status reason) {

@Override
public Attributes getAttributes() {
// TODO(zhangkun83): fill channel security attributes
return Attributes.EMPTY;
return attributes;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -1709,9 +1707,7 @@ public void socketStats_addresses() throws Exception {
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;

// clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress
SocketAddress serverAddress
= new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort());
SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);

SocketStats clientSocketStats = client.getStats().get();
Expand Down