From 771cc0fc450d9f12e667c48ed5bab05efa378a27 Mon Sep 17 00:00:00 2001 From: owenlch Date: Mon, 11 Jul 2022 00:47:13 +0800 Subject: [PATCH 1/2] feat: add get_dashbaord support for neptune Signed-off-by: owenlch --- .../metadata_service/proxy/gremlin_proxy.py | 208 +++++++++++++++++- 1 file changed, 206 insertions(+), 2 deletions(-) diff --git a/metadata/metadata_service/proxy/gremlin_proxy.py b/metadata/metadata_service/proxy/gremlin_proxy.py index 0bf8a2b7db..ee3bd0d582 100644 --- a/metadata/metadata_service/proxy/gremlin_proxy.py +++ b/metadata/metadata_service/proxy/gremlin_proxy.py @@ -7,6 +7,7 @@ from abc import abstractmethod from datetime import date, datetime, timedelta from operator import attrgetter +from flask import current_app, has_app_context from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, Type, TypeVar, Union, no_type_check, overload) @@ -21,7 +22,8 @@ from amundsen_common.models.popular_table import PopularTable from amundsen_common.models.table import (Application, Column, ProgrammaticDescription, Reader, - Source, Stat, Table, Tag, Watermark) + Source, Stat, Table, Tag, Watermark, + Badge) from amundsen_common.models.user import User from amundsen_gremlin.gremlin_model import (EdgeType, EdgeTypes, VertexType, VertexTypes, WellKnownProperties) @@ -57,8 +59,11 @@ from tornado import httpclient from typing_extensions import Protocol # TODO: it's in typing 3.8 +from metadata_service import config from metadata_service.entity.dashboard_detail import \ DashboardDetail as DashboardDetailEntity +from metadata_service.entity.dashboard_query import \ + DashboardQuery as DashboardQueryEntity from metadata_service.entity.description import Description from metadata_service.entity.tag_detail import TagDetail from metadata_service.exception import NotFoundException @@ -1593,12 +1598,211 @@ def delete_resource_relation_by_user(self, *, vertex1_label=VertexTypes.User, vertex1_key=user_id, vertex2_label=vertex_type, vertex2_key=id) + @staticmethod + def _build_user_from_record(record: dict, manager_name: Optional[str] = None) -> User: + other_key_values = {} + if has_app_context() and current_app.config[config.USER_OTHER_KEYS]: + for k in current_app.config[config.USER_OTHER_KEYS]: + if k in record: + other_key_values[k] = record[k] + + return User(email=record['email'], + user_id=record.get('user_id', record['email']), + first_name=record.get('first_name'), + last_name=record.get('last_name'), + full_name=record.get('full_name'), + is_active=record.get('is_active', True), + profile_url=record.get('profile_url'), + github_username=record.get('github_username'), + team_name=record.get('team_name'), + slack_id=record.get('slack_id'), + employee_type=record.get('employee_type'), + role_name=record.get('role_name'), + manager_fullname=record.get('manager_fullname', manager_name), + other_key_values=other_key_values) + + def _make_badges(self, badges: Iterable) -> List[Badge]: + """ + Generates a list of Badges objects + :param badges: A list of badges of a table, column, or type_metadata + :return: a list of Badge objects + """ + _badges = [] + for badge in badges: + _badges.append(Badge(badge_name=badge["key"], category=badge["category"])) + return _badges + + def _get_dashboard_vertex(self, + dashboard_uri: str, + ) -> Tuple[str, str, str, int]: + """ + Helper function to get the dashbaord vertex + :param dashboard_uri: dashboard URI that is sent from frontend + :return: Tuple of 3 String and 1 int, for dashbaord uri, url, name, and created timestamp + """ + dashboard = self.g.V().has("key", dashboard_uri).valueMap().by(__.unfold()).toList() + if len(dashboard) > 1: + raise Exception(f"More than one Dashboard Found with ID {dashboard_uri}.") + if len(dashboard) == 0: + raise NotFoundException(f"No Dashboard exist with URI : {dashboard_uri}.") + + dashboard_dict = dashboard[0] + dashboard_uri = dashboard_dict.get("key", "") + dashboard_url = dashboard_dict.get("dashboard_url", "") + dashboard_name = dashboard_dict.get("name", "") + dashboard_created_timestamp = int(dashboard_dict.get("created_timestamp")) + return dashboard_uri, dashboard_url, dashboard_name, dashboard_created_timestamp + + def _get_dashboard_group_and_cluster(self, + dashboard_uri: str, + ) -> Tuple[str, str, str, str]: + """ + Helper function to get the dashbaord group and cluster + :param dashboard_uri: dashboard URI that is sent from frontend + :return: Tuple of 4 String + """ + dashboard_group = self.g.V().has("key", dashboard_uri).out("DASHBOARD_OF").valueMap().by(__.unfold()).toList() + if dashboard_group: + dashboard_group_name = dashboard_group[0].get("name", "") + dashboard_group_url = dashboard_group[0].get("dashboard_group_url", "") + else: + dashboard_group_name = "" + dashboard_group_url = "" + + dashboard_cluster = self.g.V().has("key", dashboard_uri).out("DASHBOARD_OF") + dashboard_cluster = dashboard_cluster.out("DASHBOARD_GROUP_OF").valueMap().by(__.unfold()).toList() + if dashboard_cluster: + cluster_name = dashboard_cluster[0].get("name", "") + product_name = dashboard_cluster[0].get("key", "").split("_")[0] + else: + cluster_name = "" + product_name = "" + return dashboard_group_name, dashboard_group_url, cluster_name, product_name + + def _get_dashboard_tables(self, + dashboard_uri: str, + ) -> List[PopularTable]: + """ + Helper function to get the dashbaord tables + :param dashboard_uri: dashboard URI that is sent from frontend + :return: List of PopularTable + """ + + dashboard_tables = self.g.V().has("key", dashboard_uri).out("DASHBOARD_WITH_TABLE") + dashboard_tables = dashboard_tables.valueMap(True).by(__.unfold()).toList() + tables = [] + for table in dashboard_tables: + tabe_base = table.get('key').split("://")[1] + table_db = table.get('key').split("://")[0] + table_cluster = tabe_base.split(".")[0] + table_name = table.get('name') + table_schema = tabe_base.split("/")[0] + table_desc_vertex = self.g.V(table[T.id]).out("DESCRIPTION") + table_desc_vertex = table_desc_vertex.filter(__.hasLabel("Description")).valueMap().by(__.unfold()).toList() + if table_desc_vertex: + for desc in table_desc_vertex: + table_desc = desc.get('description') + else: + table_desc = "" + table_dict = {"schema": table_schema, "cluster": table_cluster, + "database": table_db, "name": table_name, "description": table_desc} + tables.append(PopularTable(**table_dict)) + return tables + @timer_with_counter @overrides def get_dashboard(self, dashboard_uri: str, ) -> DashboardDetailEntity: - pass + ''' + Retrieves the Dashboard information based on the specified dashboard uri. + :param dashboard_uri: dashboard URI that is sent from frontend + :return: The DashboardDetailEntity object + ''' + dashboard_uri, dashboard_url, dashboard_name, dashboard_created_timestamp = \ + self._get_dashboard_vertex(dashboard_uri) + + dashboard_group_name, dashboard_group_url, cluster_name, product_name = \ + self._get_dashboard_group_and_cluster(dashboard_uri) + + dashboard_desc = self.g.V().has("key", dashboard_uri).out("DESCRIPTION") + dashboard_desc = dashboard_desc.valueMap().by(__.unfold()).toList() + if dashboard_desc: + dashboard_description = dashboard_desc[0].get("description", "") + else: + dashboard_description = "" + + owners = [] + dashboard_owners = self.g.V().has("key", dashboard_uri).out("READ_BY", "OWNER") + dashboard_owners = dashboard_owners.dedup().valueMap().by(__.unfold()).toList() + for owner in dashboard_owners: + owner_data = self._get_user_details(user_id=owner["email"], user_data=owner) + owners.append(self._build_user_from_record(record=owner_data)) + + dashboard_tags = self.g.V().has("key", dashboard_uri).out("TAGGED_BY").valueMap().by(__.unfold()).toList() + tags = [Tag(tag_type=tag['tag_type'], tag_name=tag['key']) for tag in dashboard_tags] + + dashboard_badges = self.g.V().has("key", dashboard_uri).out("HAS_BADGE").valueMap().by(__.unfold()).toList() + badges = self._make_badges(dashboard_badges) + + dashboard_charts = self.g.V().has("key", dashboard_uri).out("HAS_CHART").valueMap().by(__.unfold()).toList() + chart = [chart['name'] for chart in dashboard_charts if 'name' in chart and chart['name']] + + dashboard_query = self.g.V().has("key", dashboard_uri).out("HAS_QUERY").valueMap().by(__.unfold()).toList() + query_names = [query['name'] for query in dashboard_query if 'name' in query and query['name']] + queries = [] + for query in dashboard_query: + if query.get('name') or query.get('url') or query.get('query_text'): + query_dict = {'name': query.get('name'), 'url': query.get('url'), 'query_text': query.get('query_text')} + queries.append(DashboardQueryEntity(**query_dict)) + + tables = self._get_dashboard_tables(dashboard_uri) + + last_successful_run_timestamp = None + last_run_timestamp = None + last_run_state = None + dashboard_execution = self.g.V().has("key", dashboard_uri).out("EXECUTED").valueMap().by(__.unfold()).toList() + + for execution in dashboard_execution: + if "last_successful_execution" in execution.get("key"): + last_successful_run_timestamp = int(execution.get("timestamp")) + if "last_execution" in execution.get("key"): + last_run_timestamp = int(execution.get("timestamp")) + last_run_state = execution.get("state") + + updated_timestamp = None + dashboard_last_update = self.g.V().has("key", dashboard_uri).out("LAST_UPDATED_AT") + dashboard_last_update = dashboard_last_update.valueMap().by(__.unfold()).toList() + for last_update in dashboard_last_update: + updated_timestamp = int(last_update.get("timestamp")) + + view_count = 0 + dashboard_view_count = self.g.V().has("key", dashboard_uri).outE("READ_BY").valueMap().by(__.unfold()).toList() + for view in dashboard_view_count: + view_count += view.get("read_count") + + return DashboardDetailEntity(uri=dashboard_uri, + cluster=cluster_name, + url=dashboard_url, + name=dashboard_name, + product=product_name, + created_timestamp=dashboard_created_timestamp, + description=dashboard_description, + group_name=dashboard_group_name, + group_url=dashboard_group_url, + last_successful_run_timestamp=last_successful_run_timestamp, + last_run_timestamp=last_run_timestamp, + last_run_state=last_run_state, + updated_timestamp=updated_timestamp, + owners=owners, + tags=tags, + badges=badges, + recent_view_count=view_count, + chart_names=chart, + query_names=query_names, + queries=queries, + tables=tables + ) @timer_with_counter @overrides From 278c9c1749b9e31257db50c1f0493aec1c643270 Mon Sep 17 00:00:00 2001 From: owenlch Date: Mon, 11 Jul 2022 01:05:48 +0800 Subject: [PATCH 2/2] Update gremlin_proxy.py Signed-off-by: owenlch --- metadata/metadata_service/proxy/gremlin_proxy.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/metadata/metadata_service/proxy/gremlin_proxy.py b/metadata/metadata_service/proxy/gremlin_proxy.py index ee3bd0d582..7463336723 100644 --- a/metadata/metadata_service/proxy/gremlin_proxy.py +++ b/metadata/metadata_service/proxy/gremlin_proxy.py @@ -7,7 +7,6 @@ from abc import abstractmethod from datetime import date, datetime, timedelta from operator import attrgetter -from flask import current_app, has_app_context from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, Type, TypeVar, Union, no_type_check, overload) @@ -20,10 +19,9 @@ from amundsen_common.models.generation_code import GenerationCode from amundsen_common.models.lineage import Lineage, LineageItem from amundsen_common.models.popular_table import PopularTable -from amundsen_common.models.table import (Application, Column, +from amundsen_common.models.table import (Application, Badge, Column, ProgrammaticDescription, Reader, - Source, Stat, Table, Tag, Watermark, - Badge) + Source, Stat, Table, Tag, Watermark) from amundsen_common.models.user import User from amundsen_gremlin.gremlin_model import (EdgeType, EdgeTypes, VertexType, VertexTypes, WellKnownProperties) @@ -36,6 +34,7 @@ from amundsen_gremlin.script_translator import ( ScriptTranslator, ScriptTranslatorTargetJanusgraph) from amundsen_gremlin.test_and_development_shard import get_shard +from flask import current_app, has_app_context from gremlin_python.driver.client import Client from gremlin_python.driver.driver_remote_connection import \ DriverRemoteConnection