9
9
import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
10
10
import com .google .common .collect .ImmutableList ;
11
11
import com .google .common .collect .ImmutableMap ;
12
- import java . io . IOException ;
12
+ import com . google . common . collect . Streams ;
13
13
import java .util .Arrays ;
14
14
import java .util .Collection ;
15
15
import java .util .List ;
18
18
import java .util .function .Predicate ;
19
19
import java .util .stream .Collectors ;
20
20
import java .util .stream .Stream ;
21
- import org .apache .logging .log4j .ThreadContext ;
22
21
import org .opensearch .action .admin .indices .get .GetIndexResponse ;
23
- import org .opensearch .action .support .IndicesOptions ;
22
+ import org .opensearch .action .admin .indices .mapping .get .GetMappingsResponse ;
23
+ import org .opensearch .action .admin .indices .settings .get .GetSettingsResponse ;
24
24
import org .opensearch .client .node .NodeClient ;
25
- import org .opensearch .cluster .ClusterState ;
26
25
import org .opensearch .cluster .metadata .AliasMetadata ;
27
- import org .opensearch .cluster .metadata .IndexMetadata ;
28
26
import org .opensearch .cluster .metadata .IndexNameExpressionResolver ;
29
- import org .opensearch .cluster .metadata .MappingMetadata ;
30
- import org .opensearch .cluster .service .ClusterService ;
31
- import org .opensearch .common .collect .ImmutableOpenMap ;
32
27
import org .opensearch .common .settings .Settings ;
33
- import org .opensearch .common .unit .TimeValue ;
34
28
import org .opensearch .index .IndexSettings ;
35
29
import org .opensearch .sql .opensearch .mapping .IndexMapping ;
36
30
import org .opensearch .sql .opensearch .request .OpenSearchRequest ;
37
31
import org .opensearch .sql .opensearch .response .OpenSearchResponse ;
38
- import org .opensearch .threadpool .ThreadPool ;
39
32
40
33
/** OpenSearch connection by node client. */
41
34
public class OpenSearchNodeClient implements OpenSearchClient {
42
35
43
36
public static final Function <String , Predicate <String >> ALL_FIELDS =
44
37
(anyIndex -> (anyField -> true ));
45
38
46
- /** Current cluster state on local node. */
47
- private final ClusterService clusterService ;
48
-
49
39
/** Node client provided by OpenSearch container. */
50
40
private final NodeClient client ;
51
41
52
42
/** Index name expression resolver to get concrete index name. */
53
43
private final IndexNameExpressionResolver resolver ;
54
44
55
- private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker" ;
56
-
57
45
/**
58
46
* Constructor of ElasticsearchNodeClient.
59
47
*/
60
- public OpenSearchNodeClient (ClusterService clusterService ,
61
- NodeClient client ) {
62
- this .clusterService = clusterService ;
48
+ public OpenSearchNodeClient (NodeClient client ) {
63
49
this .client = client ;
64
50
this .resolver = new IndexNameExpressionResolver (client .threadPool ().getThreadContext ());
65
51
}
@@ -78,14 +64,16 @@ public OpenSearchNodeClient(ClusterService clusterService,
78
64
@ Override
79
65
public Map <String , IndexMapping > getIndexMappings (String ... indexExpression ) {
80
66
try {
81
- ClusterState state = clusterService .state ();
82
- String [] concreteIndices = resolveIndexExpression (state , indexExpression );
83
-
84
- return populateIndexMappings (
85
- state .metadata ().findMappings (concreteIndices , ALL_FIELDS ));
86
- } catch (IOException e ) {
67
+ GetMappingsResponse mappingsResponse = client .admin ().indices ()
68
+ .prepareGetMappings (indexExpression )
69
+ .setLocal (true )
70
+ .get ();
71
+ return Streams .stream (mappingsResponse .mappings ().iterator ())
72
+ .collect (Collectors .toMap (cursor -> cursor .key ,
73
+ cursor -> new IndexMapping (cursor .value )));
74
+ } catch (Exception e ) {
87
75
throw new IllegalStateException (
88
- "Failed to read mapping in cluster state for index pattern [" + indexExpression + "]" , e );
76
+ "Failed to read mapping for index pattern [" + indexExpression + "]" , e );
89
77
}
90
78
}
91
79
@@ -97,19 +85,24 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
97
85
*/
98
86
@ Override
99
87
public Map <String , Integer > getIndexMaxResultWindows (String ... indexExpression ) {
100
- ClusterState state = clusterService .state ();
101
- ImmutableOpenMap <String , IndexMetadata > indicesMetadata = state .metadata ().getIndices ();
102
- String [] concreteIndices = resolveIndexExpression (state , indexExpression );
103
-
104
- ImmutableMap .Builder <String , Integer > result = ImmutableMap .builder ();
105
- for (String index : concreteIndices ) {
106
- Settings settings = indicesMetadata .get (index ).getSettings ();
107
- Integer maxResultWindow = settings .getAsInt ("index.max_result_window" ,
108
- IndexSettings .MAX_RESULT_WINDOW_SETTING .getDefault (settings ));
109
- result .put (index , maxResultWindow );
88
+ try {
89
+ GetSettingsResponse settingsResponse =
90
+ client .admin ().indices ().prepareGetSettings (indexExpression ).setLocal (true ).get ();
91
+ ImmutableMap .Builder <String , Integer > result = ImmutableMap .builder ();
92
+ for (ObjectObjectCursor <String , Settings > indexToSetting :
93
+ settingsResponse .getIndexToSettings ()) {
94
+ Settings settings = indexToSetting .value ;
95
+ result .put (
96
+ indexToSetting .key ,
97
+ settings .getAsInt (
98
+ IndexSettings .MAX_RESULT_WINDOW_SETTING .getKey (),
99
+ IndexSettings .MAX_RESULT_WINDOW_SETTING .getDefault (settings )));
100
+ }
101
+ return result .build ();
102
+ } catch (Exception e ) {
103
+ throw new IllegalStateException (
104
+ "Failed to read setting for index pattern [" + indexExpression + "]" , e );
110
105
}
111
-
112
- return result .build ();
113
106
}
114
107
115
108
/**
@@ -149,9 +142,8 @@ public List<String> indices() {
149
142
*/
150
143
@ Override
151
144
public Map <String , String > meta () {
152
- final ImmutableMap .Builder <String , String > builder = new ImmutableMap .Builder <>();
153
- builder .put (META_CLUSTER_NAME , clusterService .getClusterName ().value ());
154
- return builder .build ();
145
+ return ImmutableMap .of (META_CLUSTER_NAME ,
146
+ client .settings ().get ("cluster.name" , "opensearch" ));
155
147
}
156
148
157
149
@ Override
@@ -161,40 +153,12 @@ public void cleanup(OpenSearchRequest request) {
161
153
162
154
@ Override
163
155
public void schedule (Runnable task ) {
164
- ThreadPool threadPool = client .threadPool ();
165
- threadPool .schedule (
166
- withCurrentContext (task ),
167
- new TimeValue (0 ),
168
- SQL_WORKER_THREAD_POOL_NAME
169
- );
156
+ // at that time, task already running the sql-worker ThreadPool.
157
+ task .run ();
170
158
}
171
159
172
160
@ Override
173
161
public NodeClient getNodeClient () {
174
162
return client ;
175
163
}
176
-
177
- private String [] resolveIndexExpression (ClusterState state , String [] indices ) {
178
- return resolver .concreteIndexNames (state , IndicesOptions .strictExpandOpen (), true , indices );
179
- }
180
-
181
- private Map <String , IndexMapping > populateIndexMappings (
182
- ImmutableOpenMap <String , MappingMetadata > indexMappings ) {
183
-
184
- ImmutableMap .Builder <String , IndexMapping > result = ImmutableMap .builder ();
185
- for (ObjectObjectCursor <String , MappingMetadata > cursor :
186
- indexMappings ) {
187
- result .put (cursor .key , new IndexMapping (cursor .value ));
188
- }
189
- return result .build ();
190
- }
191
-
192
- /** Copy from LogUtils. */
193
- private static Runnable withCurrentContext (final Runnable task ) {
194
- final Map <String , String > currentContext = ThreadContext .getImmutableContext ();
195
- return () -> {
196
- ThreadContext .putAll (currentContext );
197
- task .run ();
198
- };
199
- }
200
164
}
0 commit comments