33
33
34
34
import org .opensearch .client .Client ;
35
35
import org .opensearch .common .annotation .PublicApi ;
36
+ import org .opensearch .common .util .concurrent .CountDown ;
36
37
import org .opensearch .core .action .ActionListener ;
37
38
import org .opensearch .core .common .io .stream .NamedWriteableRegistry ;
38
39
import org .opensearch .core .xcontent .NamedXContentRegistry ;
39
40
import org .opensearch .core .xcontent .XContentParser ;
40
41
42
+ import java .util .ArrayList ;
43
+ import java .util .List ;
41
44
import java .util .function .BiConsumer ;
45
+ import java .util .function .LongSupplier ;
42
46
43
47
/**
44
48
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
45
49
*
46
50
* @opensearch.api
47
51
*/
48
52
@ PublicApi (since = "1.0.0" )
49
- public interface QueryRewriteContext {
53
+ public class QueryRewriteContext {
54
+ private final NamedXContentRegistry xContentRegistry ;
55
+ private final NamedWriteableRegistry writeableRegistry ;
56
+ protected final Client client ;
57
+ protected final LongSupplier nowInMillis ;
58
+ private final List <BiConsumer <Client , ActionListener <?>>> asyncActions = new ArrayList <>();
59
+ private final boolean validate ;
60
+
61
+ public QueryRewriteContext (
62
+ NamedXContentRegistry xContentRegistry ,
63
+ NamedWriteableRegistry writeableRegistry ,
64
+ Client client ,
65
+ LongSupplier nowInMillis
66
+ ) {
67
+ this (xContentRegistry , writeableRegistry , client , nowInMillis , false );
68
+ }
69
+
70
+ public QueryRewriteContext (
71
+ NamedXContentRegistry xContentRegistry ,
72
+ NamedWriteableRegistry writeableRegistry ,
73
+ Client client ,
74
+ LongSupplier nowInMillis ,
75
+ boolean validate
76
+ ) {
77
+
78
+ this .xContentRegistry = xContentRegistry ;
79
+ this .writeableRegistry = writeableRegistry ;
80
+ this .client = client ;
81
+ this .nowInMillis = nowInMillis ;
82
+ this .validate = validate ;
83
+ }
84
+
50
85
/**
51
86
* The registry used to build new {@link XContentParser}s. Contains registered named parsers needed to parse the query.
52
87
*/
53
- NamedXContentRegistry getXContentRegistry ();
88
+ public NamedXContentRegistry getXContentRegistry () {
89
+ return xContentRegistry ;
90
+ }
54
91
55
92
/**
56
93
* Returns the time in milliseconds that is shared across all resources involved. Even across shards and nodes.
57
94
*/
58
- long nowInMillis ();
95
+ public long nowInMillis () {
96
+ return nowInMillis .getAsLong ();
97
+ }
59
98
60
- NamedWriteableRegistry getWriteableRegistry ();
99
+ public NamedWriteableRegistry getWriteableRegistry () {
100
+ return writeableRegistry ;
101
+ }
61
102
62
103
/**
63
- * Returns an instance of {@link QueryShardContext} if available of null otherwise
104
+ * Returns an instance of {@link QueryShardContext} if available or null otherwise
64
105
*/
65
- default QueryShardContext convertToShardContext () {
106
+ public QueryShardContext convertToShardContext () {
66
107
return null ;
67
108
}
68
109
69
- default QueryCoordinatorContext convertToCoordinatorContext () {
110
+ /**
111
+ * Returns an instance of {@link QueryCoordinatorContext} if available or null otherwise
112
+ * @return
113
+ */
114
+ public QueryCoordinatorContext convertToCoordinatorContext () {
70
115
return null ;
71
116
}
72
117
@@ -75,18 +120,52 @@ default QueryCoordinatorContext convertToCoordinatorContext() {
75
120
* This should be used if a rewriteabel needs to fetch some external resources in order to be executed ie. a document
76
121
* from an index.
77
122
*/
78
- void registerAsyncAction (BiConsumer <Client , ActionListener <?>> asyncAction );
123
+ public void registerAsyncAction (BiConsumer <Client , ActionListener <?>> asyncAction ) {
124
+ asyncActions .add (asyncAction );
125
+ }
79
126
80
127
/**
81
128
* Returns <code>true</code> if there are any registered async actions.
82
129
*/
83
- boolean hasAsyncActions ();
130
+ public boolean hasAsyncActions () {
131
+ return asyncActions .isEmpty () == false ;
132
+ }
84
133
85
134
/**
86
135
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
87
136
* <code>null</code>. The list of registered actions is cleared once this method returns.
88
137
*/
89
- void executeAsyncActions (ActionListener listener );
138
+ public void executeAsyncActions (ActionListener listener ) {
139
+ if (asyncActions .isEmpty ()) {
140
+ listener .onResponse (null );
141
+ return ;
142
+ }
90
143
91
- boolean validate ();
144
+ CountDown countDown = new CountDown (asyncActions .size ());
145
+ ActionListener <?> internalListener = new ActionListener () {
146
+ @ Override
147
+ public void onResponse (Object o ) {
148
+ if (countDown .countDown ()) {
149
+ listener .onResponse (null );
150
+ }
151
+ }
152
+
153
+ @ Override
154
+ public void onFailure (Exception e ) {
155
+ if (countDown .fastForward ()) {
156
+ listener .onFailure (e );
157
+ }
158
+ }
159
+ };
160
+ // make a copy to prevent concurrent modification exception
161
+ List <BiConsumer <Client , ActionListener <?>>> biConsumers = new ArrayList <>(asyncActions );
162
+ asyncActions .clear ();
163
+ for (BiConsumer <Client , ActionListener <?>> action : biConsumers ) {
164
+ action .accept (client , internalListener );
165
+ }
166
+ }
167
+
168
+ public boolean validate () {
169
+ return validate ;
170
+ }
92
171
}
0 commit comments