-
Notifications
You must be signed in to change notification settings - Fork 158
Support Sort pushdown #3620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support Sort pushdown #3620
Conversation
Signed-off-by: Yuanchun Shen <[email protected]> Copy traits to logical index scan after pushing down sort Signed-off-by: Yuanchun Shen <[email protected]> Allow pushing down multiple times Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Additionally correct merge collation method Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
… in sort by fields Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
// `Sort($1)\n TableScan(name, age)` may become | ||
// `Sort($0)\n Project(age)\n TableScan(name, age)` after projection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Sort($1)\n TableScan(name, age)
may become Sort($0)\n Project(age)\n TableScan(name, age)
? SortRule should pushdown sort already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During optimization, the index in collations should point to its location in the current row instead of its location in the original table. As a result, even if sort is already pushed down, the trait RelCollation, which represents the sort status of the relation being optimized, should be updated to reflect the latest order status when there is a projection after sort.
...search/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java
Show resolved
Hide resolved
...search/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java
Show resolved
Hide resolved
if (LOG.isDebugEnabled()) { | ||
LOG.debug("Cannot pushdown the sort {}", collations, e); | ||
} else { | ||
LOG.warn("Cannot pushdown the sort {}, ", collations); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
santize collations? does it include field name or value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Replaced with their field names.
? SortOrder.DESC | ||
: SortOrder.ASC; | ||
// TODO: support script sort and distance sort | ||
SortBuilder<?> sortBuilder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there IT/UT cover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I haven't found any. Do you know the proper syntax for sorting geopoints or with script? I tried sorting on geopoints, it reports can't sort on geo_point field without using specific sorting feature, like geo_distance
. But I haven't figured out the correct syntax for sorting geopoints (it's not in the g4 file or the documentation).
@@ -1,6 +1,6 @@ | |||
{ | |||
"calcite": { | |||
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | |||
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" | |||
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], SORT->[{\n \"age\" : {\n \"order\" : \"asc\"\n }\n}], FILTER->>($0, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add another IT on timestamp field sort? source=xxx | sort @timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
CalciteLogicalIndexScan newScan = | ||
new CalciteLogicalIndexScan( | ||
getCluster(), | ||
traitsWithCollations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After adding collation by pushing down sort, we should remove them if pushing down aggregation afterwards, as aggregation will eliminate any collation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Additionally remove sort when pushing down aggregation after adding collation by pushing down sort Signed-off-by: Yuanchun Shen <[email protected]>
…into pushdown-sort
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
I found a corner case bug when sorting nested fields: sorting nested text fields does not leverage its keyword. For example (mapping): {
"from": 0,
"timeout": "1m",
"_source": {
"includes": [
"name",
"address",
"id",
"age"
],
"excludes": []
},
"sort": [
{
"address.state": {
"order": "asc"
}
}
]
} But when executing the plan, I get the following response: {
"error": {
"reason": "Error occurred in OpenSearch engine: all shards failed",
"details": "Shard[0]: java.lang.IllegalArgumentException: Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [address.state] in order to load field data by uninverting the inverted index. Note that this can use significant memory.\n\nFor more details, please send request for Json format to see the raw response from OpenSearch engine.",
"type": "SearchPhaseExecutionException"
},
"status": 400
} In contrast, the DSL in v2 uses the keyword field instead of the text field itself. {
"from": 0,
"size": 10000,
"timeout": "1m",
"_source": {
"includes": [
"name",
"address",
"id",
"age"
],
"excludes": []
},
"sort": [
{
"address.state.keyword": {
"order": "asc",
"missing": "_first"
}
}
]
} The plan in v2 can be executed with the following result. Although it's not quite correct. The engine seems to pick a random {
"schema": [
{
"name": "address.state",
"type": "string"
}
],
"datarows": [
[
"Florida"
],
[
"CA"
],
[
"NY"
],
[
"NC"
],
[
"TX"
]
],
"total": 5,
"size": 5
} Update |
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Another corner case to be fixed: sort is not pushed down when when there is a filter before it. The physical plan for the query
However, in v2, the plan for it is :
From the plan, Update
|
@@ -1,6 +1,6 @@ | |||
{ | |||
"calcite": { | |||
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | |||
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" | |||
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}], FILTER->>($0, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]
Here _last
is different with v2's behaviour, in v2, default null direction (missing) for ASC is first
. But in Calcite, the ASC maps to last
:
https://github.com/apache/calcite/blob/2fb437b93774d3d69df66547715b374d9281fdbb/core/src/main/java/org/apache/calcite/rel/RelFieldCollation.java#L134-L136
We can add a TODO to align with v2, but my question is that is the behaviour of v2 correct? @dai-chen @penghuo any thoughts about it ?
this issue is not a blocker for this PR.
{ | ||
"name": "OpenSearchIndexScan", | ||
"description": { | ||
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\"}, needClean=true, searchDone=false, pitId=s9y3QQEhb3BlbnNlYXJjaC1zcWxfdGVzdF9pbmRleF9hY2NvdW50Fndla1VpMi1kVHh5Qi1lYnhPQnlSbXcAFkU4Qm9UVURIUWI2a3pjNkhmQkxvc2cAAAAAAAAAAAsWcnUyVHZfNk1SeC1DMFNFdGtPcDN4QQEWd2VrVWkyLWRUeHlCLWVieE9CeVJtdwAA, cursorKeepAlive=1m, searchAfter=null, searchResponse=null)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catching, the v2 can not pushdown with rename
.
// TODO: Remove pushed-down sort in DSL in expectedOutput/ppl/explain_sort_then_agg_push.json | ||
// existing collations should be eliminated when pushing down aggregations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this TODO for v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added a (v2)
in comment.
// TODO: Fix the expected output in expectedOutput/ppl/explain_multi_sort_push.json | ||
// balance and gender should take precedence over account_number and firstname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems another bug of v2 sort.
source=opensearch_dashboards_sample_data_logs | head 10 | sort bytes | sort - bytes
with DSL pushdown
{
"from": 0,
"size": 10,
"timeout": "1m",
"sort": [{
"bytes": {
"order": "asc",
"missing": "_first"
}
}, {
"bytes": {
"order": "desc",
"missing": "_last"
}
}]
}
the final result is sorted ASC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in v2 it merges collations by simply concatenating them without de-duplication and re-order.
case FIRST -> "_first"; | ||
case LAST -> "_last"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a workaround to align with v2 is changing to
case FIRST -> "_last";
case LAST -> "_first";
But not recommended.
if (LOG.isDebugEnabled()) { | ||
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e); | ||
} else { | ||
LOG.warn("Cannot pushdown the sort {}, ", getCollationNames(collations)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to info
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
/** | ||
* Check if the sort by collations contains any aggregators that are pushed down. E.g. In `stats | ||
* avg(age) as avg_age by state | sort avg_age`, the sort clause has `avg_age` which is an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may have a more complex case:
avg(age) as avg_age by state | rename avg_age as new_avg_age | sort new_avg_age
I don't think the current hasAggregatorInSortBy
works for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works. This case is covered by testSortWithAggregationExplain
. The query source=opensearch-sql_test_index_account| stats avg(age) AS avg_age by state, city | sort avg_age
won't be pushed down:
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableCalc(expr#0..2=[{inputs}], avg_age=[$t2], state=[$t0], city=[$t1])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[state, city, age], AGGREGATION->rel#1064:LogicalAggregate.NONE.[](input=RelSubset#1063,group={0, 1},avg_age=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","_source":{"includes":["state","city","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"city":{"terms":{"field":"city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg_age":{"avg":{"field":"age"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
When sorting avg_age
, the field names by getRowType().getFieldNames()
is already the renamed ones (state
, city
, avg_age
).
@yuancu do we have follow cases in ExplainIT
|
…sort Signed-off-by: Yuanchun Shen <[email protected]>
Added the test cases & fixed the behavior. |
…e IT Signed-off-by: Yuanchun Shen <[email protected]>
Description
Support Sort pushdown
Related Issues
Resolves #3380
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.