Skip to content

Commit ea03d7a

Browse files
karenyrxabhita
authored andcommitted
[GRPC] Add integration tests for Bulk and Search GRPC endpoints (opensearch-project#18366)
* Add integration tests for Bulk and Search GRPC endpoints Signed-off-by: Karen Xu <[email protected]> * fix assertion for shard info and remove changelog entry Signed-off-by: Karen Xu <[email protected]> * use a common GRPC test base Signed-off-by: Karen Xu <[email protected]> --------- Signed-off-by: Karen Xu <[email protected]>
1 parent f4e7a27 commit ea03d7a

File tree

5 files changed

+307
-49
lines changed

5 files changed

+307
-49
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc;
10+
11+
import org.opensearch.plugin.transport.grpc.ssl.NettyGrpcClient;
12+
import org.opensearch.protobufs.BulkRequest;
13+
import org.opensearch.protobufs.BulkRequestBody;
14+
import org.opensearch.protobufs.BulkResponse;
15+
import org.opensearch.protobufs.IndexOperation;
16+
import org.opensearch.protobufs.services.DocumentServiceGrpc;
17+
18+
import io.grpc.ManagedChannel;
19+
20+
/**
21+
* Integration tests for the DocumentService gRPC service.
22+
*/
23+
public class DocumentServiceIT extends GrpcTransportBaseIT {
24+
25+
/**
26+
* Tests the bulk operation via gRPC.
27+
*/
28+
public void testDocumentServiceBulk() throws Exception {
29+
// Create a test index
30+
String indexName = "test-bulk-index";
31+
createTestIndex(indexName);
32+
33+
// Create a gRPC client
34+
try (NettyGrpcClient client = createGrpcClient()) {
35+
// Create a DocumentService stub
36+
ManagedChannel channel = client.getChannel();
37+
DocumentServiceGrpc.DocumentServiceBlockingStub documentStub = DocumentServiceGrpc.newBlockingStub(channel);
38+
39+
// Create a bulk request with an index operation
40+
IndexOperation indexOp = IndexOperation.newBuilder().setIndex(indexName).setId("1").build();
41+
42+
BulkRequestBody requestBody = BulkRequestBody.newBuilder()
43+
.setIndex(indexOp)
44+
.setDoc(com.google.protobuf.ByteString.copyFromUtf8(DEFAULT_DOCUMENT_SOURCE))
45+
.build();
46+
47+
BulkRequest bulkRequest = BulkRequest.newBuilder().addRequestBody(requestBody).build();
48+
49+
// Execute the bulk request
50+
BulkResponse bulkResponse = documentStub.bulk(bulkRequest);
51+
52+
// Verify the response
53+
assertNotNull("Bulk response should not be null", bulkResponse);
54+
assertFalse("Bulk response should not have errors", bulkResponse.getBulkResponseBody().getErrors());
55+
assertEquals("Bulk response should have one item", 1, bulkResponse.getBulkResponseBody().getItemsCount());
56+
57+
// Verify the document is searchable
58+
waitForSearchableDoc(indexName, "1");
59+
}
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc;
10+
11+
import org.opensearch.action.index.IndexResponse;
12+
import org.opensearch.common.network.NetworkAddress;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.xcontent.XContentType;
15+
import org.opensearch.core.common.transport.TransportAddress;
16+
import org.opensearch.plugin.transport.grpc.ssl.NettyGrpcClient;
17+
import org.opensearch.plugins.Plugin;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
20+
import java.net.InetSocketAddress;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import io.grpc.health.v1.HealthCheckResponse;
28+
29+
import static org.opensearch.plugin.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
30+
import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_KEY;
31+
32+
/**
33+
* Base test class for gRPC transport integration tests.
34+
*/
35+
public abstract class GrpcTransportBaseIT extends OpenSearchIntegTestCase {
36+
37+
// Common constants
38+
protected static final String DEFAULT_INDEX_NAME = "test-grpc-index";
39+
protected static final String DEFAULT_DOCUMENT_SOURCE = "{\"field1\":\"value1\",\"field2\":42}";
40+
41+
/**
42+
* Gets a random gRPC transport address from the cluster.
43+
*
44+
* @return A random transport address
45+
*/
46+
protected TransportAddress randomNetty4GrpcServerTransportAddr() {
47+
List<TransportAddress> addresses = new ArrayList<>();
48+
for (Netty4GrpcServerTransport transport : internalCluster().getInstances(Netty4GrpcServerTransport.class)) {
49+
TransportAddress tAddr = new TransportAddress(transport.getBoundAddress().publishAddress().address());
50+
addresses.add(tAddr);
51+
}
52+
return randomFrom(addresses);
53+
}
54+
55+
/**
56+
* Configures node settings for gRPC transport.
57+
*/
58+
@Override
59+
protected Settings nodeSettings(int nodeOrdinal) {
60+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(AUX_TRANSPORT_TYPES_KEY, GRPC_TRANSPORT_SETTING_KEY).build();
61+
}
62+
63+
/**
64+
* Configures plugins for gRPC transport.
65+
*/
66+
@Override
67+
protected Collection<Class<? extends Plugin>> nodePlugins() {
68+
return Collections.singleton(GrpcPlugin.class);
69+
}
70+
71+
/**
72+
* Verifies that the gRPC transport is started on all nodes.
73+
*/
74+
protected void verifyGrpcTransportStarted() {
75+
for (String nodeName : internalCluster().getNodeNames()) {
76+
Netty4GrpcServerTransport transport = internalCluster().getInstance(Netty4GrpcServerTransport.class, nodeName);
77+
assertNotNull("gRPC transport should be started on node " + nodeName, transport);
78+
79+
// Verify that the transport is bound to an address
80+
TransportAddress[] boundAddresses = transport.getBoundAddress().boundAddresses();
81+
assertTrue("gRPC transport should be bound to at least one address", boundAddresses.length > 0);
82+
83+
// Log the bound addresses for debugging
84+
for (TransportAddress address : boundAddresses) {
85+
InetSocketAddress inetAddress = address.address();
86+
logger.info("Node {} gRPC transport bound to {}", nodeName, NetworkAddress.format(inetAddress));
87+
}
88+
}
89+
}
90+
91+
/**
92+
* Creates and ensures a test index is green.
93+
*
94+
* @param indexName The name of the index to create
95+
*/
96+
protected void createTestIndex(String indexName) {
97+
createIndex(indexName);
98+
ensureGreen(indexName);
99+
}
100+
101+
/**
102+
* Creates a gRPC client connected to a random node.
103+
*
104+
* @return A new NettyGrpcClient instance
105+
* @throws javax.net.ssl.SSLException if there's an SSL error
106+
*/
107+
protected NettyGrpcClient createGrpcClient() throws javax.net.ssl.SSLException {
108+
return new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()).build();
109+
}
110+
111+
/**
112+
* Indexes a test document and refreshes the index.
113+
*
114+
* @param indexName The name of the index
115+
* @param id The document ID
116+
* @param source The document source
117+
* @return The IndexResponse
118+
*/
119+
protected IndexResponse indexTestDocument(String indexName, String id, String source) {
120+
IndexResponse response = client().prepareIndex(indexName).setId(id).setSource(source, XContentType.JSON).get();
121+
122+
assertTrue(
123+
"Document should be indexed without shard failures",
124+
response.getShardInfo().getSuccessful() > 0 && response.getShardInfo().getFailed() == 0
125+
);
126+
127+
// Refresh to make document searchable
128+
client().admin().indices().prepareRefresh(indexName).get();
129+
130+
return response;
131+
}
132+
133+
/**
134+
* Waits for a document to become searchable.
135+
*
136+
* @param indexName The name of the index
137+
* @param id The document ID
138+
* @throws Exception If the document doesn't become searchable within the timeout
139+
*/
140+
protected void waitForSearchableDoc(String indexName, String id) throws Exception {
141+
assertBusy(() -> {
142+
refresh(indexName);
143+
assertTrue("Document should be searchable", client().prepareGet(indexName, id).get().isExists());
144+
}, 30, TimeUnit.SECONDS);
145+
}
146+
147+
/**
148+
* Checks the health of the gRPC transport service.
149+
*
150+
* @throws Exception If the health check fails
151+
*/
152+
protected void checkGrpcTransportHealth() throws Exception {
153+
try (NettyGrpcClient client = createGrpcClient()) {
154+
assertEquals(client.checkHealth(), HealthCheckResponse.ServingStatus.SERVING);
155+
}
156+
}
157+
}

plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/plugin/transport/grpc/Netty4GrpcServerTransportIT.java

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,70 +10,39 @@
1010

1111
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1212
import org.opensearch.cluster.health.ClusterHealthStatus;
13-
import org.opensearch.common.network.NetworkAddress;
14-
import org.opensearch.common.settings.Settings;
15-
import org.opensearch.core.common.transport.TransportAddress;
1613
import org.opensearch.plugin.transport.grpc.ssl.NettyGrpcClient;
17-
import org.opensearch.plugins.Plugin;
18-
import org.opensearch.test.OpenSearchIntegTestCase;
19-
20-
import java.net.InetSocketAddress;
21-
import java.util.ArrayList;
22-
import java.util.Collection;
23-
import java.util.Collections;
24-
import java.util.List;
2514

2615
import io.grpc.health.v1.HealthCheckResponse;
2716

28-
import static org.opensearch.plugin.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
29-
import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_KEY;
30-
31-
public class Netty4GrpcServerTransportIT extends OpenSearchIntegTestCase {
32-
33-
private TransportAddress randomNetty4GrpcServerTransportAddr() {
34-
List<TransportAddress> addresses = new ArrayList<>();
35-
for (Netty4GrpcServerTransport transport : internalCluster().getInstances(Netty4GrpcServerTransport.class)) {
36-
TransportAddress tAddr = new TransportAddress(transport.getBoundAddress().publishAddress().address());
37-
addresses.add(tAddr);
38-
}
39-
return randomFrom(addresses);
40-
}
41-
42-
@Override
43-
protected Settings nodeSettings(int nodeOrdinal) {
44-
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(AUX_TRANSPORT_TYPES_KEY, GRPC_TRANSPORT_SETTING_KEY).build();
45-
}
46-
47-
@Override
48-
protected Collection<Class<? extends Plugin>> nodePlugins() {
49-
return Collections.singleton(GrpcPlugin.class);
50-
}
17+
/**
18+
* Integration tests for the gRPC transport itself.
19+
*/
20+
public class Netty4GrpcServerTransportIT extends GrpcTransportBaseIT {
5121

22+
/**
23+
* Tests that the gRPC transport is properly started.
24+
*/
5225
public void testGrpcTransportStarted() {
53-
// Verify that the gRPC transport is started on all nodes
54-
for (String nodeName : internalCluster().getNodeNames()) {
55-
Netty4GrpcServerTransport transport = internalCluster().getInstance(Netty4GrpcServerTransport.class, nodeName);
56-
assertNotNull("gRPC transport should be started on node " + nodeName, transport);
57-
58-
// Verify that the transport is bound to an address
59-
TransportAddress[] boundAddresses = transport.getBoundAddress().boundAddresses();
60-
assertTrue("gRPC transport should be bound to at least one address", boundAddresses.length > 0);
26+
verifyGrpcTransportStarted();
27+
}
6128

62-
// Log the bound addresses for debugging
63-
for (TransportAddress address : boundAddresses) {
64-
InetSocketAddress inetAddress = address.address();
65-
logger.info("Node {} gRPC transport bound to {}", nodeName, NetworkAddress.format(inetAddress));
66-
}
67-
}
29+
/**
30+
* Tests the health of the gRPC transport service.
31+
*/
32+
public void testGrpcTransportHealth() throws Exception {
33+
checkGrpcTransportHealth();
6834
}
6935

36+
/**
37+
* Tests both REST API cluster health and gRPC transport service health.
38+
*/
7039
public void testStartGrpcTransportClusterHealth() throws Exception {
7140
// REST api cluster health
7241
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().get();
7342
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
7443

7544
// gRPC transport service health
76-
try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()).build()) {
45+
try (NettyGrpcClient client = createGrpcClient()) {
7746
assertEquals(client.checkHealth(), HealthCheckResponse.ServingStatus.SERVING);
7847
}
7948
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc;
10+
11+
import org.opensearch.plugin.transport.grpc.ssl.NettyGrpcClient;
12+
import org.opensearch.protobufs.SearchRequest;
13+
import org.opensearch.protobufs.SearchRequestBody;
14+
import org.opensearch.protobufs.SearchResponse;
15+
import org.opensearch.protobufs.services.SearchServiceGrpc;
16+
17+
import io.grpc.ManagedChannel;
18+
19+
/**
20+
* Integration tests for the SearchService gRPC service.
21+
*/
22+
public class SearchServiceIT extends GrpcTransportBaseIT {
23+
24+
/**
25+
* Tests the search operation via gRPC.
26+
*/
27+
public void testSearchServiceSearch() throws Exception {
28+
// Create a test index
29+
String indexName = "test-search-index";
30+
createTestIndex(indexName);
31+
32+
// Add a document to the index
33+
indexTestDocument(indexName, "1", DEFAULT_DOCUMENT_SOURCE);
34+
35+
// Create a gRPC client
36+
try (NettyGrpcClient client = createGrpcClient()) {
37+
// Create a SearchService stub
38+
ManagedChannel channel = client.getChannel();
39+
SearchServiceGrpc.SearchServiceBlockingStub searchStub = SearchServiceGrpc.newBlockingStub(channel);
40+
41+
// Create a search request
42+
SearchRequestBody requestBody = SearchRequestBody.newBuilder().setFrom(0).setSize(10).build();
43+
44+
SearchRequest searchRequest = SearchRequest.newBuilder()
45+
.addIndex(indexName)
46+
.setRequestBody(requestBody)
47+
.setQ("field1:value1")
48+
.build();
49+
50+
// Execute the search request
51+
SearchResponse searchResponse = searchStub.search(searchRequest);
52+
53+
// Verify the response
54+
assertNotNull("Search response should not be null", searchResponse);
55+
assertTrue(
56+
"Search response should have hits",
57+
searchResponse.getResponseBody().getHits().getTotal().getTotalHits().getValue() > 0
58+
);
59+
assertEquals("Search response should have one hit", 1, searchResponse.getResponseBody().getHits().getHitsCount());
60+
assertEquals("Hit should have correct ID", "1", searchResponse.getResponseBody().getHits().getHits(0).getId());
61+
}
62+
}
63+
}

plugins/transport-grpc/src/test/java/org/opensearch/plugin/transport/grpc/ssl/NettyGrpcClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ public HealthCheckResponse.ServingStatus checkHealth() {
110110
return healthStub.check(HealthCheckRequest.newBuilder().build()).getStatus();
111111
}
112112

113+
/**
114+
* Get the managed channel for creating service stubs.
115+
* @return The managed channel.
116+
*/
117+
public ManagedChannel getChannel() {
118+
return channel;
119+
}
120+
113121
public static class Builder {
114122
private Boolean clientAuth = false;
115123
private Boolean insecure = false;

0 commit comments

Comments
 (0)