Skip to content

Add sanity test script #3564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions scripts/SanityTestScript/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Sanity Test Script

### Description
This Python script executes test queries from a CSV file using an asynchronous query API and generates comprehensive test reports.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference with existing IT? When should we use SanityTest script?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this script, we need to do sanity test manually by submitting PPL queries one by one and recording the results.

In the sanity test for 3.0.0, we uses the same PPL queries and datas as opensearch-spark https://github.com/opensearch-project/opensearch-spark/blob/main/integ-test/script/test_cases.csv. But we plans to build more factual tests next time, which should be the major difference from existing IT,

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this script, we need to do sanity test manually by submitting PPL queries one by one and recording the results.

We can run IT/YamlIT against remote cluster. ./gradlew ':integ-test:yamlRestTest' -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="yamlRestTest-0"

@dai-chen ask similar question in here.

My thoughts,

  • ExistingIT can leave with IT.
  • New command / functions, bug fix could migrate to YamlIT.
  • The tests in the PR can be considered a comprehensive test set, which can be leveraged to verify PPL behavior across both Spark and OpenSearch.

Copy link
Collaborator Author

@qianheng-aws qianheng-aws Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems yamlRestTest can satisfy our requirements of sanity test already, including the feature of results matching. What's your opinion? @LantaoJin

@penghuo By the way, does this test framework support concurrency testing?

Copy link
Member

@LantaoJin LantaoJin Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo 1. do we have any doc how to run yaml test (prepare data and queries)? 2. are yamlTest and sanity test equivalent? (can we sign-off the sanity requirement if yarmTest passed?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo By the way, does this test framework support concurrency testing?

Not sure, I think it should, borrow it from opensearch-core.

  1. do we have any doc how to run yaml test (prepare data and queries)?

data and query is self contained for each feature. for instance. in opensearch-core, each aggregation feature is sepereate file.

  1. are yamlTest and sanity test equivalent?

I think so, we can add sanityTest as yamlTest, and run yamlTest against remote cluster, then signoff.


The script produces two report types:
1. An Excel report with detailed test information for each query
2. A JSON report containing both test result overview and query-specific details

Apart from the basic feature, it also has some advanced functionality includes:
1. Concurrent query execution (note: the async query service has session limits, so use thread workers moderately despite it already supports session ID reuse)
2. Configurable query timeout for the requests
3. Flexible row selection from the input CSV file, by specifying start row and end row of the input CSV file.
4. Expected status validation when expected_status is present in the CSV
5. Ability to generate partial reports if testing is interrupted

### Usage
To use this script, you need to have Python **3.6** or higher installed. It also requires the following Python libraries:
```shell
pip install requests pandas openpyxl
```

After getting the requisite libraries, you can run the script with the following command line parameters in your shell:
```shell
python SanityTest.py --base-url ${BASE_URL} --username *** --password *** --input-csv test_queries.csv --output-file test_report
```
You need to replace the placeholders with your actual values of BASE_URL, and USERNAME, PASSWORD for authentication to your endpoint.

Running against the localhost cluster, `BASE_URL` should be set to `http://localhost:9200`. You can launch an OpenSearch cluster with SQL plugin locally in opensearch-sql repo directory with command:
```shell
./gradlew run
```

For more details of the command line parameters, you can see the help manual via command:
```shell
python SanityTest.py --help

usage: SanityTest.py [-h] --base-url BASE_URL --username USERNAME --password PASSWORD --input-csv INPUT_CSV --output-file OUTPUT_FILE
[--max-workers MAX_WORKERS] [--timeout TIMEOUT]
[--start-row START_ROW] [--end-row END_ROW]
[--log-level LOG_LEVEL]

Run tests from a CSV file and generate a report.

options:
-h, --help show this help message and exit
--base-url BASE_URL OpenSearch Cluster Connect URL of the service
--username USERNAME Username for authentication
--password PASSWORD Password for authentication
--input-csv INPUT_CSV
Path to the CSV file containing test queries
--output-file OUTPUT_FILE
Path to the output report file
--max-workers MAX_WORKERS
optional, Maximum number of worker threads (default: 2)
--timeout TIMEOUT optional, Timeout in seconds (default: 600)
--start-row START_ROW
optional, The start row of the query to run, start from 1
--end-row END_ROW optional, The end row of the query to run, not included
--log-level LOG_LEVEL
optional, Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL, default: INFO)
```

### Input CSV File
As claimed in the description, the input CSV file should at least have the column of `query` to run the tests. It also supports an optional column of `expected_status` whose value could be `SUCCESS`, `FAILED` or `TIMEOUT`. The script will check the actual status against the expected status and generate a new column of `check_status` for the check result -- TRUE means the status check passed; FALSE means the status check failed.

#### JSON Report
The JSON report provides the same information as the Excel report. But in JSON format, additionally, it includes a statistical summary of the test results at the beginning of the report.

An example of JSON report:
```json
{
"summary": {
"total_queries": 2,
"check_failed": 0,
"successful_queries": 1,
"failed_queries": 1,
"timeout_queries": 0,
"execution_time": 1.245655
},
"detailed_results": [
{
"seq_id": 1,
"query_name": "Successful Demo",
"query": "source=opensearch_dashboards_sample_data_flights | patterns FlightDelayType | stats count() by patterns_field",
"expected_status": "SUCCESS",
"status": "SUCCESS",
"duration": 0.843509,
"start_time": "2025-04-22 16:30:22.461069",
"end_time": "2025-04-22 16:30:23.304578",
"result": {
"schema": [
{
"name": "count()",
"type": "int"
},
{
"name": "patterns_field",
"type": "string"
}
],
"datarows": [
[
9311,
" "
],
[
689,
" "
]
],
"total": 2,
"size": 2
},
"check_status": true
},
{
"seq_id": 2,
"query_name": "Failed Demo",
"query": "source=opensearch_dashboards_sample_data_flights_2 | patterns FlightDelayType | stats count() by patterns_field",
"expected_status": "FAILED",
"status": "FAILED",
"duration": 0.402146,
"start_time": "2025-04-22 16:30:22.461505",
"end_time": "2025-04-22 16:30:22.863651",
"error": {
"error": "404 Client Error: Not Found for url: http:localhost:9200/_plugins/_ppl",
"response": {
"error": {
"reason": "Error occurred in OpenSearch engine: no such index [opensearch_dashboards_sample_data_flights_2]",
"details": "[opensearch_dashboards_sample_data_flights_2] IndexNotFoundException[no such index [opensearch_dashboards_sample_data_flights_2]]\nFor more details, please send request for Json format to see the raw response from OpenSearch engine.",
"type": "IndexNotFoundException"
},
"status": 404
}
},
"check_status": true
}
]
}
```
211 changes: 211 additions & 0 deletions scripts/SanityTestScript/SanityTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0

import signal
import sys
import requests
import json
import csv
import logging
from datetime import datetime
import pandas as pd
import argparse
from requests.auth import HTTPBasicAuth
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

"""
Environment: python3

Example to use this script:

python SanityTest.py --base-url ${URL_ADDRESS} --username *** --password *** --input-csv test_queries.csv --output-file test_report --max-workers 2

The input file test_queries.csv should contain column: `query`

For more details, please use command:

python SanityTest.py --help

"""

class PPLTester:
def __init__(self, base_url, username, password, max_workers, timeout, output_file, start_row, end_row, log_level):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password)
self.headers = { 'Content-Type': 'application/json' }
self.max_workers = max_workers
self.timeout = timeout
self.output_file = output_file
self.start = start_row - 1 if start_row else None
self.end = end_row - 1 if end_row else None
self.log_level = log_level
self.logger = self._setup_logger()
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
self.thread_local = threading.local()
self.test_results = []

def _setup_logger(self):
logger = logging.getLogger('PPLTester')
logger.setLevel(self.log_level)

fh = logging.FileHandler('PPL_test.log')
fh.setLevel(self.log_level)

ch = logging.StreamHandler()
ch.setLevel(self.log_level)

formatter = logging.Formatter(
'%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)

logger.addHandler(fh)
logger.addHandler(ch)

return logger


# Call submit API to submit the query
def submit_query(self, query):
url = f"{self.base_url}/_plugins/_ppl"
response_json = None
payload = {
"query": query,
}
try:
response = requests.post(url, auth=self.auth, headers=self.headers, json=payload, timeout=self.timeout)
response_json = response.json()
response.raise_for_status()
return response_json
except Exception as e:
return {"error": str(e), "response": response_json}

# Run the test and return the result
def run_test(self, query, seq_id, query_name, expected_status):
self.logger.info(f"Starting test: {seq_id}, {query}")
start_time = datetime.now()
submit_result = self.submit_query(query)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

status = "SUCCESS"

result = {
"seq_id": seq_id,
"query_name": query_name,
"query": query,
"expected_status": expected_status,
"status": status,
"duration": duration,
"start_time": start_time,
"end_time": end_time
}

if "error" in submit_result:
self.logger.warning(f"Submit query with error: {submit_result}")
status = "TIMEOUT" if "read timed out" in submit_result.get("error", "").lower() else "FAILED"
result["status"] = status
result["error"] = submit_result
else:
self.logger.debug(f"Submit return: {submit_result}")
result["result"] = submit_result

check_status = status.lower() == expected_status.lower() if expected_status else False
result["check_status"] = check_status
return result

def run_tests_from_csv(self, csv_file):
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
queries = [(row['query'], i, row.get('query_name', f'Q{i}'), row.get('expected_status', None)) for i, row in enumerate(reader, start=1) if row['query'].strip()]

# Filtering queries based on start and end
queries = queries[self.start:self.end]

# Parallel execution
futures = [self.executor.submit(self.run_test, query, seq_id, query_name, expected_status) for query, seq_id, query_name, expected_status in queries]
for future in as_completed(futures):
result = future.result()
self.test_results.append(result)

def generate_report(self):
self.logger.info("Generating report...")
total_queries = len(self.test_results)
check_failed_queries = sum(1 for r in self.test_results if r['check_status'] == False)
successful_queries = sum(1 for r in self.test_results if r['status'] == 'SUCCESS')
failed_queries = sum(1 for r in self.test_results if r['status'] == 'FAILED')
timeout_queries = sum(1 for r in self.test_results if r['status'] == 'TIMEOUT')

# Create report
report = {
"summary": {
"total_queries": total_queries,
"check_failed": check_failed_queries,
"successful_queries": successful_queries,
"failed_queries": failed_queries,
"timeout_queries": timeout_queries,
"execution_time": sum(r['duration'] for r in self.test_results)
},
"detailed_results": sorted(self.test_results, key=lambda r: r['seq_id'])
}

# Save report to JSON file
with open(f"{self.output_file}.json", 'w') as f:
json.dump(report, f, indent=2, default=str)

# Save reults to Excel file
df = pd.DataFrame(self.test_results)
df.to_excel(f"{self.output_file}.xlsx", index=False)

self.logger.info(f"Generated report in {self.output_file}.xlsx and {self.output_file}.json")

def signal_handler(sig, frame, tester):
print(f"Signal {sig} received, generating report...")
try:
tester.executor.shutdown(wait=False, cancel_futures=True)
tester.generate_report()
finally:
sys.exit(0)

def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run tests from a CSV file and generate a report.")
parser.add_argument("--base-url", required=True, help="Base URL of the service")
parser.add_argument("--username", required=True, help="Username for authentication")
parser.add_argument("--password", required=True, help="Password for authentication")
parser.add_argument("--input-csv", required=True, help="Path to the CSV file containing test queries")
parser.add_argument("--output-file", required=True, help="Path to the output report file")
parser.add_argument("--max-workers", type=int, default=2, help="optional, Maximum number of worker threads (default: 2)")
parser.add_argument("--timeout", type=int, default=600, help="optional, Timeout in seconds (default: 600)")
parser.add_argument("--start-row", type=int, default=None, help="optional, The start row of the query to run, start from 1")
parser.add_argument("--end-row", type=int, default=None, help="optional, The end row of the query to run, not included")
parser.add_argument("--log-level", default="INFO", help="optional, Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL, default: INFO)")

args = parser.parse_args()

tester = PPLTester(
base_url=args.base_url,
username=args.username,
password=args.password,
max_workers=args.max_workers,
timeout=args.timeout,
output_file=args.output_file,
start_row=args.start_row,
end_row=args.end_row,
log_level=args.log_level,
)

# Register signal handlers to generate report on interrupt
signal.signal(signal.SIGINT, lambda sig, frame: signal_handler(sig, frame, tester))
signal.signal(signal.SIGTERM, lambda sig, frame: signal_handler(sig, frame, tester))

# Running tests
tester.run_tests_from_csv(args.input_csv)

# Gnerate report
tester.generate_report()

if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions scripts/SanityTestScript/test_queries.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
query_name,query,expected_status
Successful Demo,"source=opensearch_dashboards_sample_data_flights | patterns FlightDelayType | stats count() by patterns_field",SUCCESS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assert result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was migrated from opensearch-spark repo and didn't support checking results currently. Plan to support such feature later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It bring me that we can assert results comparing to enable/disable Calcite in script.

Failed Demo,"source=opensearch_dashboards_sample_data_flights_2 | patterns FlightDelayType | stats count() by patterns_field",FAILED
Loading