From c1adfdd58b3684bd06c56130156f21b70e58a1e1 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Tue, 13 Mar 2018 20:45:49 -0700 Subject: [PATCH 1/8] netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR Always set the remote address, no reason why this should be a TLS-only feature. This is needed for channelz, and is especially useful in unit tests where we are using plaintext. --- .../io/grpc/netty/ProtocolNegotiators.java | 39 +++++++++++++++---- .../io/grpc/okhttp/OkHttpClientStream.java | 8 +++- .../io/grpc/okhttp/OkHttpClientTransport.java | 12 +++++- .../testing/AbstractTransportTest.java | 6 +-- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index c45a6ac6b26..c3292b26c8e 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -199,7 +199,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword); } return new BufferUntilProxyTunnelledHandler( - proxyHandler, negotiator.newHandler(http2Handler)); + proxyHandler, negotiator.newHandler(http2Handler), http2Handler); } } @@ -212,11 +212,15 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { private final ProtocolNegotiator.Handler originalHandler; + private final GrpcHttp2ConnectionHandler grpcHandler; public BufferUntilProxyTunnelledHandler( - ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) { + ProxyHandler proxyHandler, + ProtocolNegotiator.Handler handler, + GrpcHttp2ConnectionHandler grpcHandler) { super(proxyHandler, handler); this.originalHandler = handler; + this.grpcHandler = grpcHandler; } @@ -229,6 +233,11 @@ public AsciiString scheme() { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ProxyConnectionEvent) { writeBufferedAndRemove(ctx); + grpcHandler.handleProtocolNegotiationCompleted( + Attributes.newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .build() + ); } super.userEventTriggered(ctx, evt); } @@ -329,7 +338,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler handler) { HttpClientCodec httpClientCodec = new HttpClientCodec(); final HttpClientUpgradeHandler upgrader = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); - return new BufferingHttp2UpgradeHandler(upgrader); + return new BufferingHttp2UpgradeHandler(upgrader, handler); } } @@ -662,8 +671,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { - BufferUntilChannelActiveHandler(ChannelHandler... handlers) { - super(handlers); + private final GrpcHttp2ConnectionHandler handler; + + BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) { + super(handler); + this.handler = handler; } @Override @@ -679,6 +691,11 @@ public void handlerAdded(ChannelHandlerContext ctx) { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { writeBufferedAndRemove(ctx); + handler.handleProtocolNegotiationCompleted( + Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .build()); super.channelActive(ctx); } } @@ -689,8 +706,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { - BufferingHttp2UpgradeHandler(ChannelHandler... handlers) { - super(handlers); + private final GrpcHttp2ConnectionHandler grpcHandler; + + BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) { + super(handler); + this.grpcHandler = grpcHandler; } @Override @@ -712,6 +732,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) { writeBufferedAndRemove(ctx); + grpcHandler.handleProtocolNegotiationCompleted( + Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .build()); } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { fail(ctx, unavailableException("HTTP/2 upgrade rejected")); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 4cb6153a9c4..88bfa28b8f7 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -123,7 +123,7 @@ public void setAuthority(String authority) { @Override public Attributes getAttributes() { - return Attributes.EMPTY; + return transportState().getAttributes(); } class Sink implements AbstractClientStream.Sink { @@ -318,6 +318,12 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); } + public Attributes getAttributes() { + synchronized (lock) { + return transport.getAttributes(); + } + } + @GuardedBy("lock") private void onEndOfStream() { if (!framer().isClosed()) { diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index a1fc9fa1c34..344e5b1da53 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -31,6 +31,7 @@ import com.squareup.okhttp.internal.http.StatusLine; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -149,6 +150,7 @@ private static Map buildErrorCodeToStatusMap() { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; + private Attributes attributes; /** * Indicates the transport is in go-away state: no new streams will be processed, but existing * streams may continue. @@ -485,6 +487,13 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() startPendingStreams(); } + // TODO(zhangkun83): fill channel security attributes + // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info + attributes = Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, socket.getRemoteSocketAddress()) + .build(); + rawFrameWriter = variant.newWriter(sink, true); frameWriter.becomeConnected(rawFrameWriter, socket); @@ -675,8 +684,7 @@ public void shutdownNow(Status reason) { @Override public Attributes getAttributes() { - // TODO(zhangkun83): fill channel security attributes - return Attributes.EMPTY; + return attributes; } /** diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 8bb69216580..2ea52f3cfb6 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -68,8 +68,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.List; @@ -1709,9 +1707,7 @@ public void socketStats_addresses() throws Exception { = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); ServerStream serverStream = serverStreamCreation.stream; - // clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress - SocketAddress serverAddress - = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort()); + SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); SocketStats clientSocketStats = client.getStats().get(); From 20558cbbb4ec8b9a5f2e156a958d887118e3d826 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Wed, 14 Mar 2018 16:25:40 -0700 Subject: [PATCH 2/8] fix locking --- .../io/grpc/okhttp/OkHttpClientTransport.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 344e5b1da53..0b722d90cba 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -150,7 +150,8 @@ private static Map buildErrorCodeToStatusMap() { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; - private Attributes attributes; + // Can be read any time but only set by start() + private volatile Attributes attributes = Attributes.EMPTY; /** * Indicates the transport is in go-away state: no new streams will be processed, but existing * streams may continue. @@ -453,6 +454,7 @@ public void close() {} Variant variant = new Http2(); BufferedSink sink; Socket sock; + Attributes attrs; try { if (proxy == null) { sock = new Socket(address.getAddress(), address.getPort()); @@ -469,6 +471,10 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() sock.setTcpNoDelay(true); source = Okio.buffer(Okio.source(sock)); sink = Okio.buffer(Okio.sink(sock)); + attrs = Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) + .build(); } catch (StatusException e) { startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus()); return; @@ -485,14 +491,11 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; startPendingStreams(); - } - // TODO(zhangkun83): fill channel security attributes - // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info - attributes = Attributes - .newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, socket.getRemoteSocketAddress()) - .build(); + // TODO(zhangkun83): fill channel security attributes + // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info + attributes = attrs; + } rawFrameWriter = variant.newWriter(sink, true); frameWriter.becomeConnected(rawFrameWriter, socket); From b15db89feed43c8c120dd92348b8d47d595be06e Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Wed, 14 Mar 2018 17:45:02 -0700 Subject: [PATCH 3/8] suppress @GuardedBy --- okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 88bfa28b8f7..004827e5ee1 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -318,10 +318,10 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); } + // Safe to call without lock + @SuppressWarnings("GuardedBy") public Attributes getAttributes() { - synchronized (lock) { - return transport.getAttributes(); - } + return transport.getAttributes(); } @GuardedBy("lock") From f6c1bae035464d129622c652463d3daa8099f87b Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Thu, 15 Mar 2018 14:25:54 -0700 Subject: [PATCH 4/8] remove unneeded volatile, remove interference with proxy --- .../main/java/io/grpc/netty/ProtocolNegotiators.java | 12 ++---------- .../java/io/grpc/okhttp/OkHttpClientTransport.java | 7 ++++--- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index c3292b26c8e..df534acfd43 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -199,7 +199,7 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword); } return new BufferUntilProxyTunnelledHandler( - proxyHandler, negotiator.newHandler(http2Handler), http2Handler); + proxyHandler, negotiator.newHandler(http2Handler)); } } @@ -212,15 +212,12 @@ public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { private final ProtocolNegotiator.Handler originalHandler; - private final GrpcHttp2ConnectionHandler grpcHandler; public BufferUntilProxyTunnelledHandler( ProxyHandler proxyHandler, - ProtocolNegotiator.Handler handler, - GrpcHttp2ConnectionHandler grpcHandler) { + ProtocolNegotiator.Handler handler) { super(proxyHandler, handler); this.originalHandler = handler; - this.grpcHandler = grpcHandler; } @@ -233,11 +230,6 @@ public AsciiString scheme() { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ProxyConnectionEvent) { writeBufferedAndRemove(ctx); - grpcHandler.handleProtocolNegotiationCompleted( - Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build() - ); } super.userEventTriggered(ctx, evt); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 0b722d90cba..de93ca6120a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -150,8 +150,8 @@ private static Map buildErrorCodeToStatusMap() { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; - // Can be read any time but only set by start() - private volatile Attributes attributes = Attributes.EMPTY; + // Caution: Not synchronized, new value can only be safely read after the connection is complete. + private Attributes attributes = Attributes.EMPTY; /** * Indicates the transport is in go-away state: no new streams will be processed, but existing * streams may continue. @@ -490,11 +490,12 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() synchronized (lock) { socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; - startPendingStreams(); // TODO(zhangkun83): fill channel security attributes // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info attributes = attrs; + + startPendingStreams(); } rawFrameWriter = variant.newWriter(sink, true); From e874fab26ba5032a2545c46abbe8fc7ff5b056a8 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Thu, 15 Mar 2018 14:28:05 -0700 Subject: [PATCH 5/8] undo whitespace --- netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index df534acfd43..398ef8838a0 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -214,8 +214,7 @@ static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHan private final ProtocolNegotiator.Handler originalHandler; public BufferUntilProxyTunnelledHandler( - ProxyHandler proxyHandler, - ProtocolNegotiator.Handler handler) { + ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) { super(proxyHandler, handler); this.originalHandler = handler; } From 557aa9f3d86f494ec8078ec1271e833c98e6b4b3 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Thu, 15 Mar 2018 15:25:47 -0700 Subject: [PATCH 6/8] set attributes directly --- .../main/java/io/grpc/okhttp/OkHttpClientTransport.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index de93ca6120a..f710399c861 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -454,7 +454,6 @@ public void close() {} Variant variant = new Http2(); BufferedSink sink; Socket sock; - Attributes attrs; try { if (proxy == null) { sock = new Socket(address.getAddress(), address.getPort()); @@ -471,7 +470,9 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() sock.setTcpNoDelay(true); source = Okio.buffer(Okio.source(sock)); sink = Okio.buffer(Okio.sink(sock)); - attrs = Attributes + // TODO(zhangkun83): fill channel security attributes + // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info + attributes = Attributes .newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) .build(); @@ -491,10 +492,6 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; - // TODO(zhangkun83): fill channel security attributes - // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info - attributes = attrs; - startPendingStreams(); } From 4588c089d04482b7639dabcf1add13407f5bd027 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Fri, 16 Mar 2018 16:26:37 -0700 Subject: [PATCH 7/8] PR comments - copy the attr at construction time - update unit tests to wait until transport is ready before creating streams --- .../io/grpc/okhttp/OkHttpClientStream.java | 13 ++-- .../testing/AbstractTransportTest.java | 75 +++++++++++-------- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 004827e5ee1..5fb1388df73 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -58,6 +58,7 @@ class OkHttpClientStream extends AbstractClientStream { private volatile int id = ABSENT_ID; private final TransportState state; private final Sink sink = new Sink(); + private final Attributes attributes; private boolean useGet = false; @@ -83,6 +84,10 @@ class OkHttpClientStream extends AbstractClientStream { this.method = method; this.authority = authority; this.userAgent = userAgent; + // OkHttpClientStream is only created after the transport has finished connecting, + // so it is safe to read the transport attributes. + // We make a copy here for convenience, even though we can ask the transport. + this.attributes = transport.getAttributes(); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow, transport); } @@ -123,7 +128,7 @@ public void setAuthority(String authority) { @Override public Attributes getAttributes() { - return transportState().getAttributes(); + return attributes; } class Sink implements AbstractClientStream.Sink { @@ -318,12 +323,6 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); } - // Safe to call without lock - @SuppressWarnings("GuardedBy") - public Attributes getAttributes() { - return transport.getAttributes(); - } - @GuardedBy("lock") private void onEndOfStream() { if (!framer().isClosed()) { diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 2ea52f3cfb6..4fef0d94992 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -228,7 +228,7 @@ protected long currentTimeMillis() { public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -295,7 +295,7 @@ public void clientStartStop() throws Exception { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -309,7 +309,7 @@ public void clientStartAndStopOnceConnected() throws Exception { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -337,7 +337,7 @@ public void serverAlreadyListening() throws Exception { public void openStreamPreventsTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -389,7 +389,7 @@ public void openStreamPreventsTermination() throws Exception { public void shutdownNowKillsClientStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -426,7 +426,7 @@ public void shutdownNowKillsClientStream() throws Exception { public void shutdownNowKillsServerStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -467,7 +467,7 @@ public void shutdownNowKillsServerStream() throws Exception { public void ping() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { client.ping(mockPingCallback, MoreExecutors.directExecutor()); @@ -483,7 +483,7 @@ public void ping() throws Exception { public void ping_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); @@ -505,7 +505,7 @@ public void ping_duringShutdown() throws Exception { public void ping_afterTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); @@ -527,7 +527,7 @@ public void newStream_duringShutdown() throws Exception { InOrder inOrder = inOrder(clientStreamTracerFactory); server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( @@ -567,7 +567,7 @@ public void newStream_afterTermination() throws Exception { // dealing with afterTermination is harder than duringShutdown. server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); @@ -591,7 +591,7 @@ public void newStream_afterTermination() throws Exception { public void transportInUse_normalClose() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); stream1.start(clientStreamListener1); @@ -621,7 +621,7 @@ public void transportInUse_normalClose() throws Exception { public void transportInUse_clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); stream1.start(clientStreamListener1); @@ -646,7 +646,7 @@ public void basicStream() throws Exception { InOrder serverInOrder = inOrder(serverStreamTracerFactory); server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -801,7 +801,7 @@ public void basicStream() throws Exception { public void authorityPropagation() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -820,7 +820,7 @@ public void authorityPropagation() throws Exception { public void zeroMessageStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -856,7 +856,7 @@ public void zeroMessageStream() throws Exception { public void earlyServerClose_withServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -891,7 +891,7 @@ public void earlyServerClose_withServerHeaders() throws Exception { public void earlyServerClose_noServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -935,7 +935,7 @@ public void earlyServerClose_noServerHeaders() throws Exception { public void earlyServerClose_serverFailure() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1013,7 +1013,7 @@ public void closed( public void clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1044,7 +1044,7 @@ public void clientCancel() throws Exception { public void clientCancelFromWithinMessageRead() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1131,7 +1131,7 @@ public void onReady() { public void serverCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1179,7 +1179,7 @@ public void flowControlPushBack() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1340,7 +1340,7 @@ private int verifyMessageCountAndClose(BlockingQueue messageQueue, public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1372,7 +1372,7 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { public void interactionsAfterClientStreamCancelAreNoops() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1408,7 +1408,7 @@ protected boolean haveTransportTracer() { public void transportTracer_streamStarted() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!haveTransportTracer()) { @@ -1493,7 +1493,7 @@ public void transportTracer_streamStarted() throws Exception { public void transportTracer_server_streamEnded_ok() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1532,7 +1532,7 @@ public void transportTracer_server_streamEnded_ok() throws Exception { public void transportTracer_server_streamEnded_nonOk() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1572,7 +1572,7 @@ public void transportTracer_server_streamEnded_nonOk() throws Exception { public void transportTracer_client_streamEnded_nonOk() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1607,7 +1607,7 @@ public void transportTracer_client_streamEnded_nonOk() throws Exception { public void transportTracer_server_receive_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1652,7 +1652,7 @@ public void transportTracer_server_receive_msg() throws Exception { public void transportTracer_server_send_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1696,7 +1696,7 @@ public void transportTracer_server_send_msg() throws Exception { public void socketStats_addresses() throws Exception { server.start(serverListener); ManagedClientTransport client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1727,7 +1727,8 @@ public void socketStats_addresses() throws Exception { */ private void doPingPong(MockServerListener serverListener) throws Exception { ManagedClientTransport client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); + startTransport(client, listener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1792,6 +1793,14 @@ private static void runIfNotNull(Runnable runnable) { } } + private static void startTransport( + ManagedClientTransport clientTransport, + ManagedClientTransport.Listener listener) { + runIfNotNull(clientTransport.start(listener)); + verify(listener, timeout(100)).transportReady(); + } + + private static class MockServerListener implements ServerListener { public final BlockingQueue listeners = new LinkedBlockingQueue(); From 390d427d0261e685ad0c9f2a368149891db1dedc Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Fri, 23 Mar 2018 15:05:42 -0700 Subject: [PATCH 8/8] remove unneeded whitespace --- .../java/io/grpc/internal/testing/AbstractTransportTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 4fef0d94992..073251967fb 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -1800,7 +1800,6 @@ private static void startTransport( verify(listener, timeout(100)).transportReady(); } - private static class MockServerListener implements ServerListener { public final BlockingQueue listeners = new LinkedBlockingQueue();