Skip to content

Commit 066e429

Browse files
authored
[BugFix] Prevent push down limit with offset reach maxResultWindow (#3713)
* [BugFix] Prevent push down limit with offset no less than maxResultWindow Signed-off-by: Heng Qian <[email protected]> * Address comments Signed-off-by: Heng Qian <[email protected]> --------- Signed-off-by: Heng Qian <[email protected]>
1 parent 159b60e commit 066e429

File tree

10 files changed

+154
-90
lines changed

10 files changed

+154
-90
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
setup:
2+
- skip:
3+
features:
4+
- headers
5+
- do:
6+
indices.create:
7+
index: test
8+
body:
9+
settings:
10+
max_result_window: 1
11+
- do:
12+
bulk:
13+
index: test
14+
refresh: true
15+
body:
16+
- '{"index": {}}'
17+
- '{"id": 1}'
18+
- '{"index": {}}'
19+
- '{"id": 2}'
20+
- '{"index": {}}'
21+
- '{"id": 3}'
22+
23+
---
24+
"Prevent push down limit if the offset reach max_result_window":
25+
- do:
26+
headers:
27+
Content-Type: 'application/json'
28+
ppl:
29+
body:
30+
query: 'source=test | head 1 from 1 '
31+
- match: {"total": 1}
32+
- match: {"schema": [{"name": "id", "type": "bigint"}]}
33+
- match: {"datarows": [[2]]}
34+
35+
- do:
36+
headers:
37+
Content-Type: 'application/json'
38+
ppl:
39+
body:
40+
query: 'source=test | head 2 | head 1 from 1 '
41+
- match: { "total": 1 }
42+
- match: { "schema": [ { "name": "id", "type": "bigint" } ] }
43+
- match: { "datarows": [ [ 2 ] ] }

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,23 @@ public class OpenSearchRequestBuilder {
6868
@EqualsAndHashCode.Exclude @ToString.Exclude
6969
private final OpenSearchExprValueFactory exprValueFactory;
7070

71+
@EqualsAndHashCode.Exclude @ToString.Exclude private final int maxResultWindow;
72+
7173
private int startFrom = 0;
7274

7375
@ToString.Exclude private final Settings settings;
7476

77+
public static class PushDownUnSupportedException extends RuntimeException {
78+
public PushDownUnSupportedException(String message) {
79+
super(message);
80+
}
81+
}
82+
7583
/** Constructor. */
76-
public OpenSearchRequestBuilder(OpenSearchExprValueFactory exprValueFactory, Settings settings) {
84+
public OpenSearchRequestBuilder(
85+
OpenSearchExprValueFactory exprValueFactory, int maxResultWindow, Settings settings) {
7786
this.settings = settings;
87+
this.maxResultWindow = maxResultWindow;
7888
this.sourceBuilder =
7989
new SearchSourceBuilder()
8090
.from(startFrom)
@@ -89,16 +99,12 @@ public OpenSearchRequestBuilder(OpenSearchExprValueFactory exprValueFactory, Set
8999
* @return query request with PIT or scroll request
90100
*/
91101
public OpenSearchRequest build(
92-
OpenSearchRequest.IndexName indexName,
93-
int maxResultWindow,
94-
TimeValue cursorKeepAlive,
95-
OpenSearchClient client) {
96-
return build(indexName, maxResultWindow, cursorKeepAlive, client, false);
102+
OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) {
103+
return build(indexName, cursorKeepAlive, client, false);
97104
}
98105

99106
public OpenSearchRequest build(
100107
OpenSearchRequest.IndexName indexName,
101-
int maxResultWindow,
102108
TimeValue cursorKeepAlive,
103109
OpenSearchClient client,
104110
boolean isMappingEmpty) {
@@ -109,14 +115,11 @@ public OpenSearchRequest build(
109115
if (sourceBuilder.size() == 0 || isMappingEmpty) {
110116
return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, List.of());
111117
}
112-
return buildRequestWithPit(indexName, maxResultWindow, cursorKeepAlive, client);
118+
return buildRequestWithPit(indexName, cursorKeepAlive, client);
113119
}
114120

115121
private OpenSearchRequest buildRequestWithPit(
116-
OpenSearchRequest.IndexName indexName,
117-
int maxResultWindow,
118-
TimeValue cursorKeepAlive,
119-
OpenSearchClient client) {
122+
OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) {
120123
int size = requestedTotalSize;
121124
FetchSourceContext fetchSource = this.sourceBuilder.fetchSource();
122125
List<String> includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of();
@@ -218,10 +221,20 @@ public void pushDownLimit(Integer limit, Integer offset) {
218221
// Besides, there may be cases when the existing requestedTotalSize does not satisfy the
219222
// new limit and offset. E.g. for `head 11 | head 10 from 2`, the new requested total size
220223
// is 9. We need to adjust it accordingly.
221-
requestedTotalSize = Math.min(limit, requestedTotalSize - offset);
224+
int newRequestedTotalSize = Math.min(limit, requestedTotalSize - offset);
222225
// If there are multiple offset, we aggregate the offset
223226
// E.g. for `head 10 from 1 | head 5 from 2` equals to `head 5 from 3`
224-
startFrom += offset;
227+
int newStartFrom = startFrom + offset;
228+
229+
if (newStartFrom >= maxResultWindow) {
230+
throw new PushDownUnSupportedException(
231+
String.format(
232+
"Requested offset %d should be less than the max result window %d",
233+
newStartFrom, maxResultWindow));
234+
}
235+
236+
requestedTotalSize = newRequestedTotalSize;
237+
startFrom = newStartFrom;
225238
sourceBuilder.from(startFrom).size(requestedTotalSize);
226239
}
227240

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,7 @@ public TableScanBuilder createScanBuilder() {
207207
client,
208208
requestBuilder.getMaxResponseSize(),
209209
requestBuilder.build(
210-
indexName,
211-
getMaxResultWindow(),
212-
cursorKeepAlive,
213-
client,
214-
cachedFieldOpenSearchTypes.isEmpty()));
210+
indexName, cursorKeepAlive, client, cachedFieldOpenSearchTypes.isEmpty()));
215211
return new OpenSearchIndexScanBuilder(builder, createScanOperator);
216212
}
217213

@@ -260,7 +256,7 @@ public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) {
260256
}
261257

262258
public OpenSearchRequestBuilder createRequestBuilder() {
263-
return new OpenSearchRequestBuilder(createExprValueFactory(), settings);
259+
return new OpenSearchRequestBuilder(createExprValueFactory(), getMaxResultWindow(), settings);
264260
}
265261

266262
public OpenSearchResourceMonitor createOpenSearchResourceMonitor() {
@@ -270,10 +266,6 @@ public OpenSearchResourceMonitor createOpenSearchResourceMonitor() {
270266
public OpenSearchRequest buildRequest(OpenSearchRequestBuilder requestBuilder) {
271267
final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
272268
return requestBuilder.build(
273-
indexName,
274-
getMaxResultWindow(),
275-
cursorKeepAlive,
276-
client,
277-
cachedFieldOpenSearchTypes.isEmpty());
269+
indexName, cursorKeepAlive, client, cachedFieldOpenSearchTypes.isEmpty());
278270
}
279271
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanQueryBuilder.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.stream.Collectors;
1414
import lombok.EqualsAndHashCode;
1515
import org.apache.commons.lang3.tuple.Pair;
16+
import org.apache.logging.log4j.LogManager;
17+
import org.apache.logging.log4j.Logger;
1618
import org.opensearch.index.query.QueryBuilder;
1719
import org.opensearch.sql.ast.tree.Sort;
1820
import org.opensearch.sql.common.utils.StringUtils;
@@ -23,6 +25,7 @@
2325
import org.opensearch.sql.expression.ReferenceExpression;
2426
import org.opensearch.sql.expression.function.OpenSearchFunctions;
2527
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
28+
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder.PushDownUnSupportedException;
2629
import org.opensearch.sql.opensearch.storage.script.filter.FilterQueryBuilder;
2730
import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder;
2831
import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer;
@@ -41,6 +44,7 @@
4144
@VisibleForTesting
4245
@EqualsAndHashCode
4346
class OpenSearchIndexScanQueryBuilder implements PushDownQueryBuilder {
47+
private static final Logger LOG = LogManager.getLogger(OpenSearchIndexScanQueryBuilder.class);
4448

4549
final OpenSearchRequestBuilder requestBuilder;
4650

@@ -71,8 +75,18 @@ public boolean pushDownSort(LogicalSort sort) {
7175

7276
@Override
7377
public boolean pushDownLimit(LogicalLimit limit) {
74-
requestBuilder.pushDownLimit(limit.getLimit(), limit.getOffset());
75-
return true;
78+
try {
79+
requestBuilder.pushDownLimit(limit.getLimit(), limit.getOffset());
80+
return true;
81+
} catch (PushDownUnSupportedException e) {
82+
if (LOG.isDebugEnabled()) {
83+
LOG.debug(
84+
"Cannot pushdown limit {} with offset {}", limit.getLimit(), limit.getOffset(), e);
85+
} else {
86+
LOG.info("Cannot pushdown limit {} with offset {}", limit.getLimit(), limit.getOffset());
87+
}
88+
return false;
89+
}
7690
}
7791

7892
@Override

opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,12 @@ void explain_successfully() {
178178
OpenSearchExprValueFactory exprValueFactory = mock(OpenSearchExprValueFactory.class);
179179
final var name = new OpenSearchRequest.IndexName("test");
180180
final int maxResultWindow = 10000;
181-
final var requestBuilder = new OpenSearchRequestBuilder(exprValueFactory, settings);
181+
final var requestBuilder =
182+
new OpenSearchRequestBuilder(exprValueFactory, maxResultWindow, settings);
182183
PhysicalPlan plan =
183184
new OpenSearchIndexScan(
184185
mock(OpenSearchClient.class),
185-
requestBuilder.build(
186-
name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client));
186+
requestBuilder.build(name, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client));
187187

188188
AtomicReference<ExplainResponse> result = new AtomicReference<>();
189189
executor.explain(

opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,8 @@ void test_protect_indexScan() {
119119

120120
final var name = new OpenSearchRequest.IndexName(indexName);
121121
final var request =
122-
new OpenSearchRequestBuilder(exprValueFactory, settings)
123-
.build(
124-
name,
125-
maxResultWindow,
126-
settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE),
127-
client);
122+
new OpenSearchRequestBuilder(exprValueFactory, maxResultWindow, settings)
123+
.build(name, settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE), client);
128124
assertEquals(
129125
PhysicalPlanDSL.project(
130126
PhysicalPlanDSL.limit(

0 commit comments

Comments
 (0)