-
Notifications
You must be signed in to change notification settings - Fork 154
Add sanity test script #3564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add sanity test script #3564
Changes from all commits
19fdcf3
8075402
9376402
83a68e0
3f82991
b741922
d5bab71
5f43497
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
# Sanity Test Script | ||
|
||
### Description | ||
This Python script executes test queries from a CSV file using an asynchronous query API and generates comprehensive test reports. | ||
|
||
The script produces two report types: | ||
1. An Excel report with detailed test information for each query | ||
2. A JSON report containing both test result overview and query-specific details | ||
|
||
Apart from the basic feature, it also has some advanced functionality includes: | ||
1. Concurrent query execution (note: the async query service has session limits, so use thread workers moderately despite it already supports session ID reuse) | ||
2. Configurable query timeout for the requests | ||
3. Flexible row selection from the input CSV file, by specifying start row and end row of the input CSV file. | ||
4. Expected status validation when expected_status is present in the CSV | ||
5. Ability to generate partial reports if testing is interrupted | ||
|
||
### Usage | ||
To use this script, you need to have Python **3.6** or higher installed. It also requires the following Python libraries: | ||
```shell | ||
pip install requests pandas openpyxl | ||
``` | ||
|
||
After getting the requisite libraries, you can run the script with the following command line parameters in your shell: | ||
```shell | ||
python SanityTest.py --base-url ${BASE_URL} --username *** --password *** --input-csv test_queries.csv --output-file test_report | ||
``` | ||
You need to replace the placeholders with your actual values of BASE_URL, and USERNAME, PASSWORD for authentication to your endpoint. | ||
|
||
Running against the localhost cluster, `BASE_URL` should be set to `http://localhost:9200`. You can launch an OpenSearch cluster with SQL plugin locally in opensearch-sql repo directory with command: | ||
```shell | ||
./gradlew run | ||
``` | ||
|
||
For more details of the command line parameters, you can see the help manual via command: | ||
```shell | ||
python SanityTest.py --help | ||
|
||
usage: SanityTest.py [-h] --base-url BASE_URL --username USERNAME --password PASSWORD --input-csv INPUT_CSV --output-file OUTPUT_FILE | ||
[--max-workers MAX_WORKERS] [--timeout TIMEOUT] | ||
[--start-row START_ROW] [--end-row END_ROW] | ||
[--log-level LOG_LEVEL] | ||
|
||
Run tests from a CSV file and generate a report. | ||
|
||
options: | ||
-h, --help show this help message and exit | ||
--base-url BASE_URL OpenSearch Cluster Connect URL of the service | ||
--username USERNAME Username for authentication | ||
--password PASSWORD Password for authentication | ||
--input-csv INPUT_CSV | ||
Path to the CSV file containing test queries | ||
--output-file OUTPUT_FILE | ||
Path to the output report file | ||
--max-workers MAX_WORKERS | ||
optional, Maximum number of worker threads (default: 2) | ||
--timeout TIMEOUT optional, Timeout in seconds (default: 600) | ||
--start-row START_ROW | ||
optional, The start row of the query to run, start from 1 | ||
--end-row END_ROW optional, The end row of the query to run, not included | ||
--log-level LOG_LEVEL | ||
optional, Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL, default: INFO) | ||
``` | ||
|
||
### Input CSV File | ||
As claimed in the description, the input CSV file should at least have the column of `query` to run the tests. It also supports an optional column of `expected_status` whose value could be `SUCCESS`, `FAILED` or `TIMEOUT`. The script will check the actual status against the expected status and generate a new column of `check_status` for the check result -- TRUE means the status check passed; FALSE means the status check failed. | ||
|
||
#### JSON Report | ||
The JSON report provides the same information as the Excel report. But in JSON format, additionally, it includes a statistical summary of the test results at the beginning of the report. | ||
|
||
An example of JSON report: | ||
```json | ||
{ | ||
"summary": { | ||
"total_queries": 2, | ||
"check_failed": 0, | ||
"successful_queries": 1, | ||
"failed_queries": 1, | ||
"timeout_queries": 0, | ||
"execution_time": 1.245655 | ||
}, | ||
"detailed_results": [ | ||
{ | ||
"seq_id": 1, | ||
"query_name": "Successful Demo", | ||
"query": "source=opensearch_dashboards_sample_data_flights | patterns FlightDelayType | stats count() by patterns_field", | ||
"expected_status": "SUCCESS", | ||
"status": "SUCCESS", | ||
"duration": 0.843509, | ||
"start_time": "2025-04-22 16:30:22.461069", | ||
"end_time": "2025-04-22 16:30:23.304578", | ||
"result": { | ||
"schema": [ | ||
{ | ||
"name": "count()", | ||
"type": "int" | ||
}, | ||
{ | ||
"name": "patterns_field", | ||
"type": "string" | ||
} | ||
], | ||
"datarows": [ | ||
[ | ||
9311, | ||
" " | ||
], | ||
[ | ||
689, | ||
" " | ||
] | ||
], | ||
"total": 2, | ||
"size": 2 | ||
}, | ||
"check_status": true | ||
}, | ||
{ | ||
"seq_id": 2, | ||
"query_name": "Failed Demo", | ||
"query": "source=opensearch_dashboards_sample_data_flights_2 | patterns FlightDelayType | stats count() by patterns_field", | ||
"expected_status": "FAILED", | ||
"status": "FAILED", | ||
"duration": 0.402146, | ||
"start_time": "2025-04-22 16:30:22.461505", | ||
"end_time": "2025-04-22 16:30:22.863651", | ||
"error": { | ||
"error": "404 Client Error: Not Found for url: http:localhost:9200/_plugins/_ppl", | ||
"response": { | ||
"error": { | ||
"reason": "Error occurred in OpenSearch engine: no such index [opensearch_dashboards_sample_data_flights_2]", | ||
"details": "[opensearch_dashboards_sample_data_flights_2] IndexNotFoundException[no such index [opensearch_dashboards_sample_data_flights_2]]\nFor more details, please send request for Json format to see the raw response from OpenSearch engine.", | ||
"type": "IndexNotFoundException" | ||
}, | ||
"status": 404 | ||
} | ||
}, | ||
"check_status": true | ||
} | ||
] | ||
} | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import signal | ||
import sys | ||
import requests | ||
import json | ||
import csv | ||
import logging | ||
from datetime import datetime | ||
import pandas as pd | ||
import argparse | ||
from requests.auth import HTTPBasicAuth | ||
from concurrent.futures import ThreadPoolExecutor, as_completed | ||
import threading | ||
|
||
""" | ||
Environment: python3 | ||
|
||
Example to use this script: | ||
|
||
python SanityTest.py --base-url ${URL_ADDRESS} --username *** --password *** --input-csv test_queries.csv --output-file test_report --max-workers 2 | ||
|
||
The input file test_queries.csv should contain column: `query` | ||
|
||
For more details, please use command: | ||
|
||
python SanityTest.py --help | ||
|
||
""" | ||
|
||
class PPLTester: | ||
def __init__(self, base_url, username, password, max_workers, timeout, output_file, start_row, end_row, log_level): | ||
self.base_url = base_url | ||
self.auth = HTTPBasicAuth(username, password) | ||
self.headers = { 'Content-Type': 'application/json' } | ||
self.max_workers = max_workers | ||
self.timeout = timeout | ||
self.output_file = output_file | ||
self.start = start_row - 1 if start_row else None | ||
self.end = end_row - 1 if end_row else None | ||
self.log_level = log_level | ||
self.logger = self._setup_logger() | ||
self.executor = ThreadPoolExecutor(max_workers=self.max_workers) | ||
self.thread_local = threading.local() | ||
self.test_results = [] | ||
|
||
def _setup_logger(self): | ||
logger = logging.getLogger('PPLTester') | ||
logger.setLevel(self.log_level) | ||
|
||
fh = logging.FileHandler('PPL_test.log') | ||
fh.setLevel(self.log_level) | ||
|
||
ch = logging.StreamHandler() | ||
ch.setLevel(self.log_level) | ||
|
||
formatter = logging.Formatter( | ||
'%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' | ||
) | ||
fh.setFormatter(formatter) | ||
ch.setFormatter(formatter) | ||
|
||
logger.addHandler(fh) | ||
logger.addHandler(ch) | ||
|
||
return logger | ||
|
||
|
||
# Call submit API to submit the query | ||
def submit_query(self, query): | ||
url = f"{self.base_url}/_plugins/_ppl" | ||
response_json = None | ||
payload = { | ||
"query": query, | ||
} | ||
try: | ||
response = requests.post(url, auth=self.auth, headers=self.headers, json=payload, timeout=self.timeout) | ||
response_json = response.json() | ||
response.raise_for_status() | ||
return response_json | ||
except Exception as e: | ||
return {"error": str(e), "response": response_json} | ||
|
||
# Run the test and return the result | ||
def run_test(self, query, seq_id, query_name, expected_status): | ||
self.logger.info(f"Starting test: {seq_id}, {query}") | ||
start_time = datetime.now() | ||
submit_result = self.submit_query(query) | ||
end_time = datetime.now() | ||
duration = (end_time - start_time).total_seconds() | ||
|
||
status = "SUCCESS" | ||
|
||
result = { | ||
"seq_id": seq_id, | ||
"query_name": query_name, | ||
"query": query, | ||
"expected_status": expected_status, | ||
"status": status, | ||
"duration": duration, | ||
"start_time": start_time, | ||
"end_time": end_time | ||
} | ||
|
||
if "error" in submit_result: | ||
self.logger.warning(f"Submit query with error: {submit_result}") | ||
status = "TIMEOUT" if "read timed out" in submit_result.get("error", "").lower() else "FAILED" | ||
result["status"] = status | ||
result["error"] = submit_result | ||
else: | ||
self.logger.debug(f"Submit return: {submit_result}") | ||
result["result"] = submit_result | ||
|
||
check_status = status.lower() == expected_status.lower() if expected_status else False | ||
result["check_status"] = check_status | ||
return result | ||
|
||
def run_tests_from_csv(self, csv_file): | ||
with open(csv_file, 'r') as f: | ||
reader = csv.DictReader(f) | ||
queries = [(row['query'], i, row.get('query_name', f'Q{i}'), row.get('expected_status', None)) for i, row in enumerate(reader, start=1) if row['query'].strip()] | ||
|
||
# Filtering queries based on start and end | ||
queries = queries[self.start:self.end] | ||
|
||
# Parallel execution | ||
futures = [self.executor.submit(self.run_test, query, seq_id, query_name, expected_status) for query, seq_id, query_name, expected_status in queries] | ||
for future in as_completed(futures): | ||
result = future.result() | ||
self.test_results.append(result) | ||
|
||
def generate_report(self): | ||
self.logger.info("Generating report...") | ||
total_queries = len(self.test_results) | ||
check_failed_queries = sum(1 for r in self.test_results if r['check_status'] == False) | ||
successful_queries = sum(1 for r in self.test_results if r['status'] == 'SUCCESS') | ||
failed_queries = sum(1 for r in self.test_results if r['status'] == 'FAILED') | ||
timeout_queries = sum(1 for r in self.test_results if r['status'] == 'TIMEOUT') | ||
|
||
# Create report | ||
report = { | ||
"summary": { | ||
"total_queries": total_queries, | ||
"check_failed": check_failed_queries, | ||
"successful_queries": successful_queries, | ||
"failed_queries": failed_queries, | ||
"timeout_queries": timeout_queries, | ||
"execution_time": sum(r['duration'] for r in self.test_results) | ||
}, | ||
"detailed_results": sorted(self.test_results, key=lambda r: r['seq_id']) | ||
} | ||
|
||
# Save report to JSON file | ||
with open(f"{self.output_file}.json", 'w') as f: | ||
json.dump(report, f, indent=2, default=str) | ||
|
||
# Save reults to Excel file | ||
df = pd.DataFrame(self.test_results) | ||
df.to_excel(f"{self.output_file}.xlsx", index=False) | ||
|
||
self.logger.info(f"Generated report in {self.output_file}.xlsx and {self.output_file}.json") | ||
|
||
def signal_handler(sig, frame, tester): | ||
print(f"Signal {sig} received, generating report...") | ||
try: | ||
tester.executor.shutdown(wait=False, cancel_futures=True) | ||
tester.generate_report() | ||
finally: | ||
sys.exit(0) | ||
|
||
def main(): | ||
# Parse command line arguments | ||
parser = argparse.ArgumentParser(description="Run tests from a CSV file and generate a report.") | ||
parser.add_argument("--base-url", required=True, help="Base URL of the service") | ||
parser.add_argument("--username", required=True, help="Username for authentication") | ||
parser.add_argument("--password", required=True, help="Password for authentication") | ||
parser.add_argument("--input-csv", required=True, help="Path to the CSV file containing test queries") | ||
parser.add_argument("--output-file", required=True, help="Path to the output report file") | ||
parser.add_argument("--max-workers", type=int, default=2, help="optional, Maximum number of worker threads (default: 2)") | ||
parser.add_argument("--timeout", type=int, default=600, help="optional, Timeout in seconds (default: 600)") | ||
parser.add_argument("--start-row", type=int, default=None, help="optional, The start row of the query to run, start from 1") | ||
parser.add_argument("--end-row", type=int, default=None, help="optional, The end row of the query to run, not included") | ||
parser.add_argument("--log-level", default="INFO", help="optional, Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL, default: INFO)") | ||
|
||
args = parser.parse_args() | ||
|
||
tester = PPLTester( | ||
base_url=args.base_url, | ||
username=args.username, | ||
password=args.password, | ||
max_workers=args.max_workers, | ||
timeout=args.timeout, | ||
output_file=args.output_file, | ||
start_row=args.start_row, | ||
end_row=args.end_row, | ||
log_level=args.log_level, | ||
) | ||
|
||
# Register signal handlers to generate report on interrupt | ||
signal.signal(signal.SIGINT, lambda sig, frame: signal_handler(sig, frame, tester)) | ||
signal.signal(signal.SIGTERM, lambda sig, frame: signal_handler(sig, frame, tester)) | ||
|
||
# Running tests | ||
tester.run_tests_from_csv(args.input_csv) | ||
|
||
# Gnerate report | ||
tester.generate_report() | ||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
query_name,query,expected_status | ||
Successful Demo,"source=opensearch_dashboards_sample_data_flights | patterns FlightDelayType | stats count() by patterns_field",SUCCESS | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we assert result? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was migrated from opensearch-spark repo and didn't support checking results currently. Plan to support such feature later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It bring me that we can assert results comparing to enable/disable Calcite in script. |
||
Failed Demo,"source=opensearch_dashboards_sample_data_flights_2 | patterns FlightDelayType | stats count() by patterns_field",FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference with existing IT? When should we use SanityTest script?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this script, we need to do sanity test manually by submitting PPL queries one by one and recording the results.
In the sanity test for 3.0.0, we uses the same PPL queries and datas as opensearch-spark https://github.com/opensearch-project/opensearch-spark/blob/main/integ-test/script/test_cases.csv. But we plans to build more factual tests next time, which should be the major difference from existing IT,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can run IT/YamlIT against remote cluster.
./gradlew ':integ-test:yamlRestTest' -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="yamlRestTest-0"
@dai-chen ask similar question in here.
My thoughts,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems
yamlRestTest
can satisfy our requirements of sanity test already, including the feature of results matching. What's your opinion? @LantaoJin@penghuo By the way, does this test framework support concurrency testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@penghuo 1. do we have any doc how to run yaml test (prepare data and queries)? 2. are yamlTest and sanity test equivalent? (can we sign-off the sanity requirement if yarmTest passed?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, I think it should, borrow it from opensearch-core.
data and query is self contained for each feature. for instance. in opensearch-core, each aggregation feature is sepereate file.
I think so, we can add sanityTest as yamlTest, and run yamlTest against remote cluster, then signoff.