-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSQLValidator.py
50 lines (43 loc) · 1.81 KB
/
SQLValidator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import re
class SQLQueryValidator():
def __init__(self):
pass
# list of DDL DML etc and other keywords that can be used for abusing db
prohibited_sql_keywords = [
'ALTER', 'RENAME', 'DELETE', 'UPDATE', 'CREATE', 'TRUNCATE', 'RENAME', 'GRANT', 'COMMENT', 'REVOKE', 'ANALYZE',
'REINDEX', 'CLUSTER', 'INSERT', 'MERGE', 'REPLACE', 'COMMIT', 'ROLLBACK', 'SAVEPOINT', 'SET TRANSACTION', 'SET',
'RESET', 'DISCARD', 'EXEC', 'EXECUTE', 'CALL', 'SHOW', 'LOAD', 'COPY', '1=1', 'pg_sleep', 'BENCHMARK',
'LOAD_FILE', 'SLEEP'
]
def _is_query_safe(self, query : str) -> dict[str, str, bool] :
'''
checks if the query is safe or not
'''
if len(query) > 1000:
return {
"status" : "DANGEROUS",
"reason" : f"query length is greater than 1000 characters ({len(query)})",
"safe" : False
}
query = query.upper()
if r'%25' in query or r'%5C' in query:
return {
"status" : "DANGEROUS",
"reason" : "query contains escape characters",
"safe" : False
}
for k in self.prohibited_sql_keywords:
if re.search(rf'\b{k}\b', query):
return {
"status" : "DANGEROUS",
"reason" : f"query contains prohibited keyword {k}",
"safe" : False
}
return {
"status" : "SAFE",
"reason" : "no potentially dangerous keywords found",
"safe" : True
}
def check_query(self, sql_query : str) -> bool :
res = self._is_query_safe(sql_query)
return True if res["safe"] else False