Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit 3a740ad

Browse files
authored
[PPL] Add ES rest client to support standalone mode (#484)
* Add standalone IT and rest client impl * Add standalone IT and rest client impl * Add rest client UT * Add more UT * Add cleanup method * Add assertion for cleanup method * Avoid cleanup multiple times * Don't cleanup at last page automatically * More comments * UT covers all branches
1 parent 9ac0819 commit 3a740ad

File tree

13 files changed

+543
-28
lines changed

13 files changed

+543
-28
lines changed

elasticsearch/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ repositories {
1111
dependencies {
1212
compile project(':core')
1313
compile group: 'org.elasticsearch', name: 'elasticsearch', version: "${es_version}"
14+
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version:"${es_version}"
1415

1516
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
1617
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'

elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchClient.java

+6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public interface ElasticsearchClient {
4242
*/
4343
ElasticsearchResponse search(ElasticsearchRequest request);
4444

45+
/**
46+
* Clean up resources related to the search request, for example scroll context.
47+
* @param request search request
48+
*/
49+
void cleanup(ElasticsearchRequest request);
50+
4551
/**
4652
* Schedule a task to run.
4753
* @param task task

elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClient.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,20 @@ public ElasticsearchResponse search(ElasticsearchRequest request) {
101101
esResponse = client.searchScroll(request.scrollRequest()).actionGet();
102102
} else {
103103
esResponse = client.search(request.searchRequest()).actionGet();
104-
request.setScrollId(esResponse.getScrollId());
105104
}
105+
request.setScrollId(esResponse.getScrollId());
106106

107-
ElasticsearchResponse response = new ElasticsearchResponse(esResponse);
108-
if (response.isEmpty()) {
107+
return new ElasticsearchResponse(esResponse);
108+
}
109+
110+
@Override
111+
public void cleanup(ElasticsearchRequest request) {
112+
if (request.isScrollStarted()) {
109113
client.prepareClearScroll().
110-
addScrollId(esResponse.getScrollId()).
114+
addScrollId(request.getScrollId()).
111115
get();
116+
request.reset();
112117
}
113-
return response;
114118
}
115119

116120
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.opendistroforelasticsearch.sql.elasticsearch.client;
18+
19+
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.mapping.IndexMapping;
20+
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchRequest;
21+
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.response.ElasticsearchResponse;
22+
import lombok.RequiredArgsConstructor;
23+
import org.elasticsearch.action.search.ClearScrollRequest;
24+
import org.elasticsearch.action.search.SearchResponse;
25+
import org.elasticsearch.client.RequestOptions;
26+
import org.elasticsearch.client.RestHighLevelClient;
27+
import org.elasticsearch.client.indices.GetMappingsRequest;
28+
import org.elasticsearch.client.indices.GetMappingsResponse;
29+
30+
import java.io.IOException;
31+
import java.util.Map;
32+
import java.util.stream.Collectors;
33+
34+
/**
35+
* Elasticsearch REST client to support standalone mode that runs entire engine from remote.
36+
*
37+
* TODO: Support for authN and authZ with AWS Sigv4 or security plugin.
38+
*/
39+
@RequiredArgsConstructor
40+
public class ElasticsearchRestClient implements ElasticsearchClient {
41+
42+
/**
43+
* Elasticsearch high level REST client
44+
*/
45+
private final RestHighLevelClient client;
46+
47+
48+
@Override
49+
public Map<String, IndexMapping> getIndexMappings(String indexExpression) {
50+
GetMappingsRequest request = new GetMappingsRequest().indices(indexExpression);
51+
try {
52+
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
53+
return response.mappings().entrySet().stream().
54+
collect(Collectors.toMap(
55+
Map.Entry::getKey,
56+
e -> new IndexMapping(e.getValue()))
57+
);
58+
} catch (IOException e) {
59+
throw new IllegalStateException(
60+
"Failed to get index mappings for " + indexExpression, e);
61+
}
62+
}
63+
64+
@Override
65+
public ElasticsearchResponse search(ElasticsearchRequest request) {
66+
try {
67+
SearchResponse esResponse;
68+
if (request.isScrollStarted()) {
69+
esResponse = client.scroll(request.scrollRequest(), RequestOptions.DEFAULT);
70+
} else {
71+
esResponse = client.search(request.searchRequest(), RequestOptions.DEFAULT);
72+
}
73+
request.setScrollId(esResponse.getScrollId());
74+
75+
return new ElasticsearchResponse(esResponse);
76+
} catch (IOException e) {
77+
throw new IllegalStateException(
78+
"Failed to perform search operation with request " + request, e);
79+
}
80+
}
81+
82+
@Override
83+
public void cleanup(ElasticsearchRequest request) {
84+
try {
85+
if (!request.isScrollStarted()) {
86+
return;
87+
}
88+
89+
ClearScrollRequest clearRequest = new ClearScrollRequest();
90+
clearRequest.addScrollId(request.getScrollId());
91+
client.clearScroll(clearRequest, RequestOptions.DEFAULT);
92+
} catch (IOException e) {
93+
throw new IllegalStateException(
94+
"Failed to clean up resources for search request " + request, e);
95+
} finally {
96+
request.reset();
97+
}
98+
}
99+
100+
@Override
101+
public void schedule(Runnable task) {
102+
task.run();
103+
}
104+
105+
}

elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/request/ElasticsearchRequest.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@
2929
import java.util.Objects;
3030

3131
/**
32-
* Elasticsearch search request
32+
* Elasticsearch search request. This has to be stateful because it needs to:
33+
*
34+
* 1) Accumulate search source builder when visiting logical plan to push down operation
35+
* 2) Maintain scroll ID between calls to client search method
3336
*/
3437
@EqualsAndHashCode
3538
@RequiredArgsConstructor
39+
@Getter
3640
@ToString
3741
public class ElasticsearchRequest {
3842

@@ -56,7 +60,6 @@ public class ElasticsearchRequest {
5660
/**
5761
* Search request source builder.
5862
*/
59-
@Getter
6063
private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
6164

6265
/**
@@ -87,4 +90,12 @@ public SearchScrollRequest scrollRequest() {
8790
scrollId(scrollId);
8891
}
8992

93+
/**
94+
* Reset internal state in case any stale data. However, ideally the same instance
95+
* is not supposed to be reused across different physical plan.
96+
*/
97+
public void reset() {
98+
scrollId = null;
99+
}
100+
90101
}

elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScan.java

+9
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public ElasticsearchIndexScan(ElasticsearchClient client, String indexName) {
6363

6464
@Override
6565
public void open() {
66+
super.open();
67+
6668
// For now pull all results immediately once open
6769
List<ElasticsearchResponse> responses = new ArrayList<>();
6870
ElasticsearchResponse response = client.search(request);
@@ -83,4 +85,11 @@ public ExprValue next() {
8385
return ExprValueUtils.fromObjectValue(hits.next().getSourceAsMap());
8486
}
8587

88+
@Override
89+
public void close() {
90+
super.close();
91+
92+
client.cleanup(request);
93+
}
94+
8695
}

elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClientTest.java

+37-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.ImmutableSortedMap;
2424
import com.google.common.io.Resources;
2525
import org.apache.lucene.search.TotalHits;
26+
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
2627
import org.elasticsearch.action.search.SearchResponse;
2728
import org.elasticsearch.client.node.NodeClient;
2829
import org.elasticsearch.cluster.ClusterState;
@@ -32,7 +33,6 @@
3233
import org.elasticsearch.cluster.metadata.MetaData;
3334
import org.elasticsearch.cluster.service.ClusterService;
3435
import org.elasticsearch.common.collect.ImmutableOpenMap;
35-
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.xcontent.DeprecationHandler;
3737
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3838
import org.elasticsearch.common.xcontent.XContentParser;
@@ -43,7 +43,9 @@
4343
import org.elasticsearch.threadpool.ThreadPool;
4444
import org.junit.jupiter.api.Test;
4545
import org.junit.jupiter.api.extension.ExtendWith;
46+
import org.mockito.InOrder;
4647
import org.mockito.Mock;
48+
import org.mockito.Mockito;
4749
import org.mockito.junit.jupiter.MockitoExtension;
4850

4951
import java.io.IOException;
@@ -60,6 +62,8 @@
6062
import static org.mockito.Mockito.any;
6163
import static org.mockito.Mockito.doAnswer;
6264
import static org.mockito.Mockito.mock;
65+
import static org.mockito.Mockito.never;
66+
import static org.mockito.Mockito.verify;
6367
import static org.mockito.Mockito.when;
6468

6569
@ExtendWith(MockitoExtension.class)
@@ -156,9 +160,6 @@ public void search() {
156160
when(scrollResponse.getScrollId()).thenReturn("scroll456");
157161
when(scrollResponse.getHits()).thenReturn(SearchHits.empty());
158162

159-
// Mock clear scroll request
160-
when(nodeClient.prepareClearScroll().addScrollId("scroll456").get()).thenReturn(null);
161-
162163
// Verify response for first scroll request
163164
ElasticsearchRequest request = new ElasticsearchRequest("test");
164165
ElasticsearchResponse response1 = client.search(request);
@@ -178,9 +179,8 @@ public void search() {
178179
void schedule() {
179180
ThreadPool threadPool = mock(ThreadPool.class);
180181
when(threadPool.preserveContext(any())).then(invocation -> invocation.getArgument(0));
182+
when(nodeClient.threadPool()).thenReturn(threadPool);
181183

182-
// Instantiate NodeClient because Mockito cannot mock final method threadPool()
183-
nodeClient = new NodeClient(Settings.EMPTY, threadPool);
184184
doAnswer(invocation -> {
185185
Runnable task = invocation.getArgument(0);
186186
task.run();
@@ -194,6 +194,37 @@ void schedule() {
194194
assertTrue(isRun.get());
195195
}
196196

197+
@Test
198+
void cleanup() {
199+
ElasticsearchNodeClient client = new ElasticsearchNodeClient(mock(ClusterService.class),
200+
nodeClient);
201+
202+
ClearScrollRequestBuilder requestBuilder = mock(ClearScrollRequestBuilder.class);
203+
when(nodeClient.prepareClearScroll()).thenReturn(requestBuilder);
204+
when(requestBuilder.addScrollId(any())).thenReturn(requestBuilder);
205+
when(requestBuilder.get()).thenReturn(null);
206+
207+
ElasticsearchRequest request = new ElasticsearchRequest("test");
208+
request.setScrollId("scroll123");
209+
client.cleanup(request);
210+
assertFalse(request.isScrollStarted());
211+
212+
InOrder inOrder = Mockito.inOrder(nodeClient, requestBuilder);
213+
inOrder.verify(nodeClient).prepareClearScroll();
214+
inOrder.verify(requestBuilder).addScrollId("scroll123");
215+
inOrder.verify(requestBuilder).get();
216+
}
217+
218+
@Test
219+
void cleanupWithoutScrollId() {
220+
ElasticsearchNodeClient client = new ElasticsearchNodeClient(mock(ClusterService.class),
221+
nodeClient);
222+
223+
ElasticsearchRequest request = new ElasticsearchRequest("test");
224+
client.cleanup(request);
225+
verify(nodeClient, never()).prepareClearScroll();
226+
}
227+
197228
private ElasticsearchNodeClient mockClient(String indexName, String mappings) {
198229
ClusterService clusterService = mockClusterService(indexName, mappings);
199230
return new ElasticsearchNodeClient(clusterService, nodeClient);

0 commit comments

Comments
 (0)