Skip to content

Support Sort pushdown #3620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open

Conversation

yuancu
Copy link
Contributor

@yuancu yuancu commented May 13, 2025

Description

Support Sort pushdown

Related Issues

Resolves #3380

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@LantaoJin LantaoJin added the calcite calcite migration releated label May 22, 2025
yuancu added 3 commits June 20, 2025 14:58
Signed-off-by: Yuanchun Shen <[email protected]>

Copy traits to logical index scan after pushing down sort

Signed-off-by: Yuanchun Shen <[email protected]>

Allow pushing down multiple times

Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Additionally correct merge collation method

Signed-off-by: Yuanchun Shen <[email protected]>
@yuancu yuancu changed the title WIP: Support Sort pushdown Support Sort pushdown Jun 23, 2025
@yuancu yuancu marked this pull request as ready for review June 23, 2025 14:59
@yuancu yuancu requested a review from qianheng-aws as a code owner June 23, 2025 14:59
Signed-off-by: Yuanchun Shen <[email protected]>
Comment on lines +138 to +139
// `Sort($1)\n TableScan(name, age)` may become
// `Sort($0)\n Project(age)\n TableScan(name, age)` after projection.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Sort($1)\n TableScan(name, age) may become Sort($0)\n Project(age)\n TableScan(name, age)? SortRule should pushdown sort already?

Copy link
Contributor Author

@yuancu yuancu Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During optimization, the index in collations should point to its location in the current row instead of its location in the original table. As a result, even if sort is already pushed down, the trait RelCollation, which represents the sort status of the relation being optimized, should be updated to reflect the latest order status when there is a projection after sort.

Comment on lines 298 to 302
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the sort {}", collations, e);
} else {
LOG.warn("Cannot pushdown the sort {}, ", collations);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

santize collations? does it include field name or value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Replaced with their field names.

? SortOrder.DESC
: SortOrder.ASC;
// TODO: support script sort and distance sort
SortBuilder<?> sortBuilder;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there IT/UT cover this?

Copy link
Contributor Author

@yuancu yuancu Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I haven't found any. Do you know the proper syntax for sorting geopoints or with script? I tried sorting on geopoints, it reports can't sort on geo_point field without using specific sorting feature, like geo_distance. But I haven't figured out the correct syntax for sorting geopoints (it's not in the g4 file or the documentation).

@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], SORT->[{\n \"age\" : {\n \"order\" : \"asc\"\n }\n}], FILTER->>($0, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another IT on timestamp field sort? source=xxx | sort @timestamp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

CalciteLogicalIndexScan newScan =
new CalciteLogicalIndexScan(
getCluster(),
traitsWithCollations,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding collation by pushing down sort, we should remove them if pushing down aggregation afterwards, as aggregation will eliminate any collation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@yuancu yuancu marked this pull request as draft June 24, 2025 05:11
yuancu added 6 commits June 24, 2025 17:47
Additionally remove sort when pushing down aggregation after adding collation by pushing down sort

Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
@yuancu yuancu marked this pull request as ready for review June 24, 2025 13:39
@yuancu
Copy link
Contributor Author

yuancu commented Jun 25, 2025

I found a corner case bug when sorting nested fields: sorting nested text fields does not leverage its keyword.

For example (mapping):
For query: source=opensearch-sql_test_index_nested_simple | sort address.state, its DSL is as follows

{
  "from": 0,
  "timeout": "1m",
  "_source": {
    "includes": [
      "name",
      "address",
      "id",
      "age"
    ],
    "excludes": []
  },
  "sort": [
    {
      "address.state": {
        "order": "asc"
      }
    }
  ]
}

But when executing the plan, I get the following response:

{
  "error": {
    "reason": "Error occurred in OpenSearch engine: all shards failed",
    "details": "Shard[0]: java.lang.IllegalArgumentException: Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [address.state] in order to load field data by uninverting the inverted index. Note that this can use significant memory.\n\nFor more details, please send request for Json format to see the raw response from OpenSearch engine.",
    "type": "SearchPhaseExecutionException"
  },
  "status": 400
}

In contrast, the DSL in v2 uses the keyword field instead of the text field itself.

{
  "from": 0,
  "size": 10000,
  "timeout": "1m",
  "_source": {
    "includes": [
      "name",
      "address",
      "id",
      "age"
    ],
    "excludes": []
  },
  "sort": [
    {
      "address.state.keyword": {
        "order": "asc",
        "missing": "_first"
      }
    }
  ]
}

The plan in v2 can be executed with the following result. Although it's not quite correct. The engine seems to pick a random address.state from the nested list and displays another one when project with fields.

{
    "schema": [
        {
            "name": "address.state",
            "type": "string"
        }
    ],
    "datarows": [
        [
            "Florida"
        ],
        [
            "CA"
        ],
        [
            "NY"
        ],
        [
            "NC"
        ],
        [
            "TX"
        ]
    ],
    "total": 5,
    "size": 5
}

Update
Fixed by pointing to its keyword field when sorting a text field (if possible).

@yuancu
Copy link
Contributor Author

yuancu commented Jun 25, 2025

Another corner case to be fixed: sort is not pushed down when when there is a filter before it.

The physical plan for the query source=bank | where birthdate < '2023-01-01 23:59:58.000000000' | sort birthdate | fields birthdate is:

EnumerableSort(sort0=[$0], dir0=[ASC])
  EnumerableCalc(expr#0=[{inputs}], expr#1=[TIMESTAMP($t0)], expr#2=['2023-01-01 23:59:58':EXPR_TIMESTAMP VARCHAR], expr#3=[<($t1, $t2)], birthdate=[$t0], $condition=[$t3])
    CalciteEnumerableIndexScan(table=[[OpenSearch, bank]], PushDownContext=[[PROJECT->[birthdate]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["birthdate"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

However, in v2, the plan for it is :

{
    "root": {
        "name": "ProjectOperator",
        "description": {
            "fields": "[birthdate]"
        },
        "children": [
            {
                "name": "OpenSearchIndexScan",
                "description": {
                    "request": "OpenSearchQueryRequest(indexName=bank, sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"birthdate\":{\"from\":null,\"to\":1672617598000,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"birthdate\"],\"excludes\":[]},\"sort\":[{\"birthdate\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean=true, searchDone=false, pitId=*, cursorKeepAlive=1m, searchAfter=null, searchResponse=null)"
                },
                "children": []
            }
        ]
    }
}

From the plan, where ... | sort ... is pushed down in v2 but not in Calcite.

Update

penghuo
penghuo previously approved these changes Jun 26, 2025
@Swiddis Swiddis added the enhancement New feature or request label Jun 26, 2025
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}], FILTER->>($0, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]

Here _last is different with v2's behaviour, in v2, default null direction (missing) for ASC is first. But in Calcite, the ASC maps to last:
https://github.com/apache/calcite/blob/2fb437b93774d3d69df66547715b374d9281fdbb/core/src/main/java/org/apache/calcite/rel/RelFieldCollation.java#L134-L136

We can add a TODO to align with v2, but my question is that is the behaviour of v2 correct? @dai-chen @penghuo any thoughts about it ?

this issue is not a blocker for this PR.

{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\"}, needClean=true, searchDone=false, pitId=s9y3QQEhb3BlbnNlYXJjaC1zcWxfdGVzdF9pbmRleF9hY2NvdW50Fndla1VpMi1kVHh5Qi1lYnhPQnlSbXcAFkU4Qm9UVURIUWI2a3pjNkhmQkxvc2cAAAAAAAAAAAsWcnUyVHZfNk1SeC1DMFNFdGtPcDN4QQEWd2VrVWkyLWRUeHlCLWVieE9CeVJtdwAA, cursorKeepAlive=1m, searchAfter=null, searchResponse=null)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catching, the v2 can not pushdown with rename.

Comment on lines 130 to 131
// TODO: Remove pushed-down sort in DSL in expectedOutput/ppl/explain_sort_then_agg_push.json
// existing collations should be eliminated when pushing down aggregations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO for v2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Added a (v2) in comment.

Comment on lines 112 to 113
// TODO: Fix the expected output in expectedOutput/ppl/explain_multi_sort_push.json
// balance and gender should take precedence over account_number and firstname
Copy link
Member

@LantaoJin LantaoJin Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems another bug of v2 sort.

source=opensearch_dashboards_sample_data_logs  | head 10 | sort bytes | sort - bytes

with DSL pushdown

{
  "from": 0,
  "size": 10,
  "timeout": "1m",
      "sort": [{
        "bytes": {
            "order": "asc",
            "missing": "_first"
        }
    }, {
        "bytes": {
            "order": "desc",
            "missing": "_last"
        }
    }]
}

the final result is sorted ASC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in v2 it merges collations by simply concatenating them without de-duplication and re-order.

Comment on lines +296 to +297
case FIRST -> "_first";
case LAST -> "_last";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a workaround to align with v2 is changing to

case FIRST -> "_last";
case LAST -> "_first";

But not recommended.

if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e);
} else {
LOG.warn("Cannot pushdown the sort {}, ", getCollationNames(collations));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


/**
* Check if the sort by collations contains any aggregators that are pushed down. E.g. In `stats
* avg(age) as avg_age by state | sort avg_age`, the sort clause has `avg_age` which is an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may have a more complex case:

avg(age) as avg_age by state | rename avg_age as new_avg_age | sort new_avg_age

I don't think the current hasAggregatorInSortBy works for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works. This case is covered by testSortWithAggregationExplain. The query source=opensearch-sql_test_index_account| stats avg(age) AS avg_age by state, city | sort avg_age won't be pushed down:

EnumerableSort(sort0=[$0], dir0=[ASC])
  EnumerableCalc(expr#0..2=[{inputs}], avg_age=[$t2], state=[$t0], city=[$t1])
    CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[state, city, age], AGGREGATION->rel#1064:LogicalAggregate.NONE.[](input=RelSubset#1063,group={0, 1},avg_age=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","_source":{"includes":["state","city","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"city":{"terms":{"field":"city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg_age":{"avg":{"field":"age"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

When sorting avg_age, the field names by getRowType().getFieldNames() is already the renamed ones (state, city, avg_age).

@LantaoJin
Copy link
Member

LantaoJin commented Jun 27, 2025

@yuancu do we have follow cases in ExplainIT

source=t | head 10 | sort field // pushdown limit only
source=t | sort field | head 10 // pushdown sort only or pushdown sort + limit (depends on the execution order of DSL)

@yuancu
Copy link
Contributor Author

yuancu commented Jun 27, 2025

@yuancu do we have follow cases in ExplainIT

source=t | head 10 | sort field // pushdown limit only
source=t | sort field | head 10 // pushdown sort only or pushdown sort + limit (depends on the execution order of DSL)

Added the test cases & fixed the behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.19-dev calcite calcite migration releated enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Calcite Engine Framework: pushdown sort
5 participants