Skip to content

fix: Use managed read transactions for queries in metadata service #2119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions metadata/metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from flask import current_app, has_app_context
from neo4j import GraphDatabase, Record # noqa: F401
from neo4j import GraphDatabase, Record, Transaction # noqa: F401
from neo4j.api import (SECURITY_TYPE_SECURE,
SECURITY_TYPE_SELF_SIGNED_CERTIFICATE, parse_neo4j_uri)
from neo4j.exceptions import ClientError
Expand Down Expand Up @@ -56,6 +56,17 @@
LOGGER = logging.getLogger(__name__)


def execute_statement(tx: Transaction, stmt: str, params: dict = None) -> List[Record]:
"""
Executes statement against Neo4j. If execution fails, it rollsback and raises exception.
"""
LOGGER.debug('Executing statement: %s with params %s', stmt, params)

result = tx.run(stmt, parameters=params)

return result.data()


def get_single_record(records_list: List[Record]) -> Record:
"""
Helper method to get single item from _execute_cypher_query return when only one item is expected.
Expand Down Expand Up @@ -551,14 +562,16 @@ def _safe_get(self, dct, *keys):
def _execute_cypher_query(self, *,
statement: str,
param_dict: Dict[str, Any]) -> List[Record]:
"""
Execute Cypher queries using managed read transactions
"""
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('Executing Cypher query: {statement} with params {params}: '.format(statement=statement,
params=param_dict))
start = time.time()
try:
with self._driver.session(database=self._database_name) as session:
result = session.run(query=statement, **param_dict)
return [record for record in result]
return session.read_transaction(execute_statement, statement, param_dict)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the return the same on the execute_statement function. So rather than doing .data() doing [record for record in result]. I am not sure if using .data() could cause a mismatch in the expected returned data type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is a good point, I was having trouble getting the results originally so I thought it needed to be done a different way, but turns out I was just doing it in the wrong spot. just updated this so the original return way is used, but just in the transaction function instead of here


finally:
# TODO: Add support on statsd
Expand Down
2 changes: 1 addition & 1 deletion metadata/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '3.12.0'
__version__ = '3.12.1'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down