Skip to content

Commit e38c8ec

Browse files
authored
feat: add stream method for ServerStream (#1575)
* feat: add stream method for ServerStream * add integration test * fix integration test * fix format * add javadoc * fix format * add exception * remove unused exception in tests
1 parent ffeb820 commit e38c8ec

File tree

3 files changed

+50
-0
lines changed

3 files changed

+50
-0
lines changed

gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java

+11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import com.google.api.core.InternalApi;
3333
import java.util.Iterator;
34+
import java.util.stream.Stream;
35+
import java.util.stream.StreamSupport;
3436
import javax.annotation.Nonnull;
3537

3638
/**
@@ -89,6 +91,15 @@ public Iterator<V> iterator() {
8991
return iterator;
9092
}
9193

94+
/**
95+
* Returns a sequential {@code Stream} with server responses as its source.
96+
*
97+
* @return a sequential {@code Stream} over the elements in server responses
98+
*/
99+
public Stream<V> stream() {
100+
return StreamSupport.stream(this.spliterator(), false);
101+
}
102+
92103
/**
93104
* Returns true if the next call to the iterator's hasNext() or next() is guaranteed to be
94105
* nonblocking.

gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.Executors;
4242
import java.util.concurrent.Future;
4343
import java.util.concurrent.TimeUnit;
44+
import java.util.stream.Collectors;
4445
import org.junit.After;
4546
import org.junit.Before;
4647
import org.junit.Test;
@@ -109,6 +110,31 @@ public List<Integer> call() {
109110
Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4);
110111
}
111112

113+
@Test
114+
public void testMultipleItemStreamMethod() throws Exception {
115+
Future<Void> producerFuture =
116+
executor.submit(
117+
() -> {
118+
for (int i = 0; i < 5; i++) {
119+
int requestCount = controller.popLastPull();
120+
121+
Truth.assertWithMessage("ServerStream should request one item at a time")
122+
.that(requestCount)
123+
.isEqualTo(1);
124+
125+
stream.observer().onResponse(i);
126+
}
127+
stream.observer().onComplete();
128+
return null;
129+
});
130+
Future<List<Integer>> consumerFuture =
131+
executor.submit(() -> stream.stream().collect(Collectors.toList()));
132+
133+
producerFuture.get(60, TimeUnit.SECONDS);
134+
List<Integer> results = consumerFuture.get();
135+
Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4);
136+
}
137+
112138
@Test
113139
public void testEarlyTermination() throws Exception {
114140
Future<Void> taskFuture =

showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java

+13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
3131
import java.util.ArrayList;
3232
import java.util.Iterator;
33+
import java.util.stream.Collectors;
3334
import org.junit.After;
3435
import org.junit.Before;
3536
import org.junit.Ignore;
@@ -71,6 +72,18 @@ public void testGrpc_receiveStreamedContent() {
7172
.inOrder();
7273
}
7374

75+
@Test
76+
public void testGrpc_receiveStreamedContentStreamAPI() {
77+
String content = "The rain in Spain stays mainly on the plain!";
78+
ServerStream<EchoResponse> responseStream =
79+
grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build());
80+
assertThat(responseStream.stream().map(EchoResponse::getContent).collect(Collectors.toList()))
81+
.containsExactlyElementsIn(
82+
ImmutableList.of(
83+
"The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!"))
84+
.inOrder();
85+
}
86+
7487
@Test
7588
public void testGrpc_serverError_receiveErrorAfterLastWordInStream() {
7689
String content = "The rain in Spain";

0 commit comments

Comments
 (0)