From ee24056aac37c3b59bcb08bf8806066960080a79 Mon Sep 17 00:00:00 2001 From: acezen Date: Mon, 19 Jun 2023 20:11:23 +0800 Subject: [PATCH 1/7] Prototype for load_from and write_to Signed-off-by: acezen Add doc Update Update Unify the API Runnable Signed-off-by: acezen Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Rebase Signed-off-by: acezen Committed-by: acezen from Dev container Format Committed-by: acezen from Dev container Fix Committed-by: acezen from Dev container Update Committed-by: acezen from Dev container Fix Signed-off-by: acezen Committed-by: acezen from Dev container Fix Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Format Committed-by: acezen from Dev container Committed-by: acezen from Dev container Update Committed-by: acezen from Dev container Committed-by: acezen from Dev container Update Format Committed-by: acezen from Dev container --- .../frame/property_graph_frame.cc | 2 - coordinator/gscoordinator/op_executor.py | 13 ++ proto/types.proto | 2 + python/graphscope/__init__.py | 1 - python/graphscope/client/session.py | 6 - python/graphscope/framework/dag_utils.py | 41 +++-- python/graphscope/framework/graph.py | 143 +++++++++++------- python/graphscope/framework/graph_builder.py | 55 ------- python/graphscope/framework/utils.py | 113 +++++++++++++- .../unittest/{test_gar.py => test_graphar.py} | 28 ++-- 10 files changed, 256 insertions(+), 148 deletions(-) rename python/graphscope/tests/unittest/{test_gar.py => test_graphar.py} (53%) diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 6ca408c963df..e5a62bafcc40 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -112,8 +112,6 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client, #ifdef ENABLE_GAR BOOST_LEAF_AUTO(graph_info_path, params.Get(gs::rpc::GRAPH_INFO_PATH)); - BOOST_LEAF_ASSIGN(generate_eid, params.Get(gs::rpc::GENERATE_EID)); - BOOST_LEAF_ASSIGN(retain_oid, params.Get(gs::rpc::RETAIN_OID)); using loader_t = vineyard::gar_fragment_loader_t; loader_t loader(client, comm_spec, graph_info_path); diff --git a/coordinator/gscoordinator/op_executor.py b/coordinator/gscoordinator/op_executor.py index 7b43a1084760..b4d8be03a857 100644 --- a/coordinator/gscoordinator/op_executor.py +++ b/coordinator/gscoordinator/op_executor.py @@ -726,6 +726,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef): "\n" ) storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) + serialization_options = json.loads(op.attr[types_pb2.SERIALIZATION_OPTIONS].s.decode()) vineyard_endpoint = self._launcher.vineyard_endpoint vineyard_ipc_socket = self._launcher.vineyard_socket deployment, hosts = self._launcher.get_vineyard_stream_info() @@ -738,6 +739,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef): vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint, storage_options=storage_options, + serialization_options=serialization_options, deployment=deployment, hosts=hosts, ) @@ -758,8 +760,18 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef): "\n" ) storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) +<<<<<<< HEAD vineyard_endpoint = self._launcher.vineyard_endpoint vineyard_ipc_socket = self._launcher.vineyard_socket +======= + deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode()) + engine_config = self.get_analytical_engine_config() + if self._launcher.type() == types_pb2.HOSTS: + vineyard_endpoint = engine_config["vineyard_rpc_endpoint"] + else: + vineyard_endpoint = self._launcher._vineyard_internal_endpoint + vineyard_ipc_socket = engine_config["vineyard_socket"] +>>>>>>> 54ddef33 (Prototype for load_from and write_to) deployment, hosts = self._launcher.get_vineyard_stream_info() path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode() graph_id = vineyard.io.deserialize( @@ -768,6 +780,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef): vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint, storage_options=storage_options, + deseralization_options=deseralization_options, deployment=deployment, hosts=hosts, ) diff --git a/proto/types.proto b/proto/types.proto index 82d9f69e1811..b99d940ead74 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -249,6 +249,8 @@ enum ParamKey { FD = 323; // file descriptor SOURCE = 324; WRITE_OPTIONS = 325; + SERIALIZATION_OPTIONS = 326; + DESERIALIZATION_OPTIONS = 327; // large attr CHUNK_NAME = 341; diff --git a/python/graphscope/__init__.py b/python/graphscope/__init__.py index a3ed3ea76005..8af5851674ed 100644 --- a/python/graphscope/__init__.py +++ b/python/graphscope/__init__.py @@ -50,7 +50,6 @@ from graphscope.framework.errors import * from graphscope.framework.graph import Graph from graphscope.framework.graph_builder import load_from -from graphscope.framework.graph_builder import load_from_gar from graphscope.version import __version__ __doc__ = """ diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 56e7b8faf26a..7fc6f23979bc 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1162,12 +1162,6 @@ def load_from(self, *args, **kwargs): with default_session(self): return graphscope.load_from(*args, **kwargs) - def load_from_gar(self, *args, **kwargs): - """Load a graph from gar format files within the session. - See more information in :meth:`graphscope.load_from_gar`. - """ - with default_session(self): - return graphscope.load_from_gar(*args, **kwargs) @deprecated("Please use `sess.interactive` instead.") def gremlin(self, graph, params=None): diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index f8fe467a516e..44aa936615e1 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -1110,25 +1110,30 @@ def archive_graph(graph, path): return op -def save_graph_to( - graph, - path: str, - vineyard_id, - **kwargs, +def serialize_graph( + graph, path: str, storage_options: dict, serialization_options: dict ): """Serialize graph to the specified location + The meta and data of graph is dumped to specified location, + and can be restored by `Graph.load_from` in other sessions. + Each worker will write a `path_{worker_id}.meta` file and + a `path_{worker_id}` file to storage. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. - path (str): The path to serialize the graph, on each worker. + path (str): The path to serialize the graph, on each worker, supported + storages are local, hdfs, oss, s3 Returns: An op to serialize the graph to a path. """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), - types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id), - types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + types_pb2.VINEYARD_ID: utils.i_to_attr(graph._vineyard_id), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)), + types_pb2.SERIALIZATION_OPTIONS: utils.s_to_attr( + json.dumps(serialization_options) + ), } op = Operation( graph.session_id, @@ -1140,10 +1145,26 @@ def save_graph_to( return op -def load_graph_from(path: str, sess, **kwargs): +def deserialize_graph( + path: str, sess, storage_options: dict, deserialization_options: dict +): + """Deserialize graph from the specified location. + + Args: + path (str): The path contains the serialization files. + sess (`graphscope.Session`): The target session + that the graph will be construct in. + + Returns: + `Graph`: A new graph object. Schema and data is supposed to be + identical with the one that called serialized method. + """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), - types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)), + types_pb2.DESERIALIZATION_OPTIONS: utils.s_to_attr( + json.dumps(deserialization_options) + ), } op = Operation( sess.session_id, diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 281ca3ddf54d..0a076b416df0 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -28,6 +28,7 @@ from typing import Mapping from typing import Tuple from typing import Union +from urllib.parse import urlparse try: import vineyard @@ -116,9 +117,6 @@ def save_to(self, path, **kwargs): def load_from(cls, path, sess, **kwargs): raise NotImplementedError - def archive(self, path, **kwargs): - raise NotImplementedError - @abstractmethod def project(self, vertices, edges): raise NotImplementedError @@ -457,16 +455,6 @@ def to_dataframe(self, selector, vertex_range=None): op = dag_utils.graph_to_dataframe(self, selector, vertex_range) return ResultDAGNode(self, op) - def archive(self, path): - """Archive the graph to gar format with graph yaml file path. - - Args: - path (str): The graph yaml file path describe how to archive the graph. - """ - check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) - op = dag_utils.archive_graph(self, path) - return ArchivedGraph(self._session, op) - def to_directed(self): op = dag_utils.to_directed(self) graph_dag_node = GraphDAGNode(self._session, op) @@ -1164,49 +1152,101 @@ def _check_unmodified(self): self.signature == self._saved_signature, "Graph has been modified!" ) - def save_to(self, path, **kwargs): - """Serialize graph to a location. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.load_from` in other sessions. - - Each worker will write a `path_{worker_id}.meta` file and - a `path_{worker_id}` file to storage. - Args: - path (str): supported storages are local, hdfs, oss, s3 - """ - - op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs) - self._session.dag.add_op(op) - return self._session._wrapper(op) + @staticmethod + def _load_from_graphar(path, sess, **kwargs): + # graphar now only support global vertex map. + vertex_map = utils.vertex_map_type_to_enum("global") + config = { + types_pb2.OID_TYPE: utils.s_to_attr( + "int64_t" + ), # grahar use vertex index as oid, so it always be int64_t + types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), + types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), + types_pb2.IS_FROM_GAR: utils.b_to_attr(True), + types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), + types_pb2.COMPACT_EDGES: utils.b_to_attr(False), + types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path), + } + op = dag_utils.create_graph( + sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config + ) + return sess._wrapper(GraphDAGNode(sess, op)) @classmethod - def load_from(cls, path, sess, **kwargs): - """Construct a `Graph` by deserialize from `path`. - It will read all serialization files, which is dumped by - `Graph.serialize`. - If any serialize file doesn't exists or broken, will error out. + def load_from(cls, uristring, sess=None, **kwargs): + """Load a ArrowProperty graph from with a certain data source. The data source + can be vineyard serialized files or graphar files. Args: - path (str): Path contains the serialization files. - sess (`graphscope.Session`): The target session - that the graph will be construct in - + uristring (str): URI contains the description of the data source or + path contains the serialization files, + example: "graphar+file:///tmp/graphar/xxx" + sess (`graphscope.Session`): The target session that the graph + will be construct, if None, use the default session. + kwargs: Other arguments that will be passed to the data source loader. Returns: - `Graph`: A new graph object. Schema and data is supposed to be - identical with the one that called serialized method. + `Graph`: A new graph object. """ - op = dag_utils.load_graph_from(path, sess, **kwargs) - return sess._wrapper(GraphDAGNode(sess, op)) + from graphscope.client.session import get_default_session + + if sess is None: + sess = get_default_session() + uri = urlparse(uristring) + if uri.scheme and "+" in uri.scheme: + source = uri.scheme.split("+")[0] + path = uri.scheme.split("+")[-1] + "://" + uri.netloc + uri.path + if source == "graphar": + return cls._load_from_graphar(path, sess) + else: + raise ValueError("Unknown source: %s" % source) + else: + # not a uri string, assume it is a path for deserialization + storage_options = kwargs.pop("storage_options", {}) + deserialization_options = kwargs.pop("deserialization_options", {}) + op = dag_utils.deserialize_graph( + uristring, sess, storage_options, deserialization_options + ) + return sess._wrapper(GraphDAGNode(sess, op)) - def archive(self, path): - """Archive graph gar format files base on the graph info. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.deserialize` in other sessions. + def save_to( + self, + path, + format="serialization", + **kwargs, + ): + """Save graph to specified location with specified format. Args: - path (str): the graph info file path. + path (str): the directory path to write graph. + format (str): the format to write graph, default is "serialization". + kwargs: Other arguments that will be passed to the data source + saver. + + Return (dict): A dict contains the type and uri string of output data. """ - return self._session._wrapper(self._graph_node.archive(path)) + if format == "graphar": + graphar_options = kwargs.pop("graphar_options", {}) + graph_info_path = utils.generate_graphar_info_from_schema( + path, + self._schema, + graphar_options, + ) + op = dag_utils.save_to_graphar(self, graph_info_path) + self._session.dag.add_op(op) + self._session._wrapper(op) + return {"type": format, "uri": "graphar+" + graph_info_path} + elif format == "serialization": + # serialize graph + storage_options = kwargs.pop("storage_options", {}) + serialization_options = kwargs.pop("serialization_options", {}) + op = dag_utils.serialize_graph( + self, path, storage_options, serialization_options + ) + self._session.dag.add_op(op) + self._session._wrapper(op) + return {"type": format, "uri": path} + else: + raise ValueError("Unknown format: %s" % format) @apply_docstring(GraphDAGNode.add_vertices) def add_vertices( @@ -1299,14 +1339,3 @@ def __init__(self, session, op): self._op = op # add op to dag self._session.dag.add_op(self._op) - - -class ArchivedGraph(DAGNode): - """Archived graph node in a DAG""" - - def __init__(self, session, op): - super().__init__() - self._session = session - self._op = op - # add op to dag - self._session.dag.add_op(self._op) diff --git a/python/graphscope/framework/graph_builder.py b/python/graphscope/framework/graph_builder.py index fc1a6e4cb07b..3a3245daa288 100644 --- a/python/graphscope/framework/graph_builder.py +++ b/python/graphscope/framework/graph_builder.py @@ -217,58 +217,3 @@ def load_from( use_perfect_hash=use_perfect_hash, ) return graph - - -def load_from_gar( - graph_info_path: str, - directed=True, - oid_type="int64_t", - vid_type="uint64_t", - vertex_map="global", - compact_edges=False, - use_perfect_hash=False, -) -> Graph: - sess = get_default_session() - oid_type = utils.normalize_data_type_str(oid_type) - if oid_type not in ("int32_t", "int64_t", "std::string"): - raise ValueError("The 'oid_type' can only be int32_t, int64_t or string.") - vid_type = utils.normalize_data_type_str(vid_type) - if vid_type not in ("uint32_t", "uint64_t"): - raise ValueError("The 'vid_type' can only be uint32_t or uint64_t.") - if compact_edges: - raise ValueError( - "Loading from gar with 'compact_edges' hasn't been supported yet." - ) - if use_perfect_hash: - raise ValueError( - "Loading from gar with 'use_perfect_hash' hasn't been supported yet." - ) - # generate and add a loader op to dag - vertex_map = utils.vertex_map_type_to_enum(vertex_map) - # construct create graph op - config = { - types_pb2.DIRECTED: utils.b_to_attr(directed), - types_pb2.OID_TYPE: utils.s_to_attr(oid_type), - types_pb2.VID_TYPE: utils.s_to_attr(vid_type), - types_pb2.GENERATE_EID: utils.b_to_attr(False), - types_pb2.RETAIN_OID: utils.b_to_attr(False), - types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), - types_pb2.IS_FROM_GAR: utils.b_to_attr(True), - types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), - types_pb2.COMPACT_EDGES: utils.b_to_attr(compact_edges), - types_pb2.USE_PERFECT_HASH: utils.b_to_attr(use_perfect_hash), - types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(graph_info_path), - } - op = dag_utils.create_graph( - sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config - ) - graph = sess.g( - op, - oid_type=oid_type, - vid_type=vid_type, - directed=directed, - vertex_map=vertex_map, - compact_edges=compact_edges, - use_perfect_hash=use_perfect_hash, - ) - return graph diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 94214241b22b..8cb47eb89c4c 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -31,10 +31,12 @@ import warnings from queue import Empty from queue import Queue +from urllib.parse import urlparse import numpy as np import pandas as pd import psutil +import yaml from google.protobuf.any_pb2 import Any from graphscope.client.archive import OutArchive @@ -694,11 +696,110 @@ def wrapper(*args, **kwargs): return decorator -def apply_docstring(fn): - """Apply the docstring of `fn` to annotated function.""" +def generate_graphar_info_from_schema(path, schema, graphar_options): + import copy - def decorator(func): - func.__doc__ = fn.__doc__ - return func + class Dumper(yaml.Dumper): + def increase_indent(self, flow=False, indentless=False): + return super(Dumper, self).increase_indent(flow, False) - return decorator + def PbDataType2InfoType(str): + if str == "INT": + return "int32" + elif str == "LONG": + return "int64" + elif str == "STRING": + return "string" + elif str == "BOOL": + return "bool" + else: + raise ValueError("Invlid type name {}".format(str)) + + # if not urlparse(path).scheme: + # path = "file://" + path + graph_name = graphar_options.get("graph_name", "graph") + vertex_block_size = graphar_options.get("vertex_block_size", 262144) # 2^18 + edge_block_size = graphar_options.get("edge_block_size", 4194304) # 2^22 + file_type = graphar_options.get("file_type", "parquet") + version = graphar_options.get("version", "v1") + graph_info = dict() + graph_info["name"] = graph_name + graph_info["version"] = "gar/{}".format(version) + # process vertex info + graph_info["vertices"] = [] + graph_info["edges"] = [] + for vertex_label in schema.vertex_labels: + info = dict() + info["label"] = vertex_label + info["chunk_size"] = vertex_block_size + info["prefix"] = "vertex/" + vertex_label + "/" + info["version"] = "gar/{}".format(version) + info["property_groups"] = [{"properties": [], "file_type": file_type}] + for property in schema.get_vertex_properties(vertex_label): + info["property_groups"][0]["properties"].append( + { + "name": property.name, + "data_type": PbDataType2InfoType( + graph_def_pb2.DataTypePb.Name(property.data_type) + ), + "is_primary": True if property.name == "id" else False, + } + ) + output_path = os.path.join(path, vertex_label + ".vertex.yml") + with open(output_path, "w") as f: + yaml.dump(info, f, Dumper=Dumper, default_flow_style=False) + graph_info["vertices"].append(vertex_label + ".vertex.yml") + # process edge info + for edge_label in schema.edge_labels: + properties = [] + for property in schema.get_edge_properties(edge_label): + properties.append( + { + "name": property.name, + "data_type": PbDataType2InfoType( + graph_def_pb2.DataTypePb.Name(property.data_type) + ), + "is_primary": False, + } + ) + csr_adj_list = { + "file_type": file_type, + "property_groups": [ + {"properties": copy.deepcopy(properties), "file_type": file_type} + ], + "ordered": True, + "aligned_by": "src", + } + csc_adj_list = { + "file_type": file_type, + "property_groups": [ + {"properties": copy.deepcopy(properties), "file_type": file_type} + ], + "ordered": True, + "aligned_by": "dst", + } + for r in schema.get_relationships(edge_label): + info = dict() + info["prefix"] = ( + "edge/" + r.source + "_" + edge_label + "_" + r.destination + "/" + ) + info["edge_label"] = edge_label + info["src_label"] = r.source + info["dst_label"] = r.destination + info["chunk_size"] = edge_block_size + info["src_chunk_size"] = vertex_block_size + info["dst_chunk_size"] = vertex_block_size + info["version"] = "gar/{}".format(version) + info["adj_lists"] = [csr_adj_list, csc_adj_list] + output_path = os.path.join( + path, r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" + ) + with open(output_path, "w") as f: + yaml.dump(info, f, Dumper=Dumper, default_flow_style=False) + graph_info["edges"].append( + r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" + ) + graph_info_path = os.path.join(path, graph_name + ".graph.yml") + with open(graph_info_path, "w") as f: + yaml.dump(graph_info, f, Dumper=Dumper, default_flow_style=False) + return graph_info_path diff --git a/python/graphscope/tests/unittest/test_gar.py b/python/graphscope/tests/unittest/test_graphar.py similarity index 53% rename from python/graphscope/tests/unittest/test_gar.py rename to python/graphscope/tests/unittest/test_graphar.py index 7a2281bd76da..6c34a7cd0de5 100644 --- a/python/graphscope/tests/unittest/test_gar.py +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -18,23 +18,29 @@ import os -import pytest +from graphscope.framework.graph import Graph -gar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") +graphar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") @pytest.mark.skip(reason="Issue 3162") -def test_load_from_gar(graphscope_session): +def test_load_from_graphar(graphscope_session): graph_yaml = os.path.join( - gar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" + graphar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" ) - print(graph_yaml) - graph = graphscope_session.load_from_gar(graph_yaml) - assert graph.schema is not None - del graph + graph_yaml_path = "graphar+file://" + graph_yaml + print(graph_yaml_path) + g = Graph.load_from(graph_yaml_path, graphscope_session) + assert g.schema is not None + del g @pytest.mark.skip(reason="Issue 3162") -def test_archive_to_gar(ldbc_graph): - graph_yaml = os.path.join(gar_test_repo_dir, "graphar/ldbc/ldbc.graph.yml") - ldbc_graph.archive(graph_yaml) +def test_save_to_graphar(ldbc_graph): + graphar_options = { + "graph_name": "ldbc_sample", + "file_type": "orc", + "vertex_block_size": 256, + "edge_block_size": 1024, + } + ldbc_graph.save_to("/tmp/", format="graphar", graphar_options=graphar_options) From 173c2dc8d34fd87c84d8b42e598eaeb419c266d7 Mon Sep 17 00:00:00 2001 From: acezen Date: Mon, 9 Oct 2023 11:01:37 +0800 Subject: [PATCH 2/7] Support string oid for graphar Committed-by: acezen from Dev container Committed-by: acezen from Dev container --- python/graphscope/framework/dag_utils.py | 2 +- python/graphscope/framework/graph.py | 3 ++- python/graphscope/framework/utils.py | 33 ++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index 44aa936615e1..e3fa067693bf 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -1081,7 +1081,7 @@ def gremlin_to_subgraph( return op -def archive_graph(graph, path): +def save_to_graphar(graph, path): """Archive a graph to gar format with a path. Args: diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 0a076b416df0..fc763f08038f 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -1156,9 +1156,10 @@ def _check_unmodified(self): def _load_from_graphar(path, sess, **kwargs): # graphar now only support global vertex map. vertex_map = utils.vertex_map_type_to_enum("global") + oid_type = utils.get_oid_type_from_graph_info(path) config = { types_pb2.OID_TYPE: utils.s_to_attr( - "int64_t" + oid_type ), # grahar use vertex index as oid, so it always be int64_t types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 8cb47eb89c4c..0a4731a73e39 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -803,3 +803,36 @@ def PbDataType2InfoType(str): with open(graph_info_path, "w") as f: yaml.dump(graph_info, f, Dumper=Dumper, default_flow_style=False) return graph_info_path + +def get_oid_type_from_graph_info(path): + if "file://" in path: + path = path.replace("file://", "") + with open(path, "r") as f: + graph_info = yaml.safe_load(f) + if "vertices" not in graph_info: + raise ValueError("Invalid graph info file, no vertices found.") + vertex_info_path = graph_info["vertices"][0] + if "prefix" in graph_info: + prefix = graph_info["prefix"] + else: + prefix = os.path.dirname(path) + with open(os.path.join(prefix, vertex_info_path), "r") as f: + vertex_info = yaml.safe_load(f) + property_groups = vertex_info["property_groups"] + if len(property_groups) == 0: + raise ValueError("Invalid vertex info file, no property groups found.") + data_type = None + for property_group in property_groups: + properties = property_group["properties"] + if len(properties) == 0: + raise ValueError("Invalid vertex info file, no properties found.") + for property in properties: + if property["is_primary"]: + data_type = property["data_type"] + break + if data_type == "int64": + return "int64_t" + elif data_type == "string": + return "std::string" + else: + raise ValueError("Invalid vertex info file, primary key is not int64 or string.") From dc1e8082d46813ba18683e11d006855f9a2f6a88 Mon Sep 17 00:00:00 2001 From: acezen Date: Mon, 16 Oct 2023 10:08:21 +0800 Subject: [PATCH 3/7] Fix the graph yaml generate bug Committed-by: acezen from Dev container --- python/graphscope/framework/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 0a4731a73e39..b092a350c7f0 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -796,9 +796,9 @@ def PbDataType2InfoType(str): ) with open(output_path, "w") as f: yaml.dump(info, f, Dumper=Dumper, default_flow_style=False) - graph_info["edges"].append( - r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" - ) + graph_info["edges"].append( + r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" + ) graph_info_path = os.path.join(path, graph_name + ".graph.yml") with open(graph_info_path, "w") as f: yaml.dump(graph_info, f, Dumper=Dumper, default_flow_style=False) From da3418eea20f2cde39d0707f09eb7593ec1afa05 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 19 Oct 2023 08:51:35 +0000 Subject: [PATCH 4/7] Test Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container --- python/graphscope/dataset/ldbc.py | 217 ------------------ .../graphscope/tests/unittest/test_graphar.py | 21 +- python/setup.py | 2 +- 3 files changed, 14 insertions(+), 226 deletions(-) diff --git a/python/graphscope/dataset/ldbc.py b/python/graphscope/dataset/ldbc.py index fa91e5541352..f9e345303cc9 100644 --- a/python/graphscope/dataset/ldbc.py +++ b/python/graphscope/dataset/ldbc.py @@ -81,22 +81,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ["creationDate", "locationIP", "browserUsed", "content", "length"], "id", ), - "organisation": ( - Loader( - os.path.join(prefix, "organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["type", "name", "url"], - "id", - ), - "tagclass": ( - Loader( - os.path.join(prefix, "tagclass_0_0.csv"), header_row=True, delimiter="|" - ), - ["name", "url"], - "id", - ), "person": ( Loader( os.path.join(prefix, "person_0_0.csv"), header_row=True, delimiter="|" @@ -112,20 +96,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ], "id", ), - "forum": ( - Loader( - os.path.join(prefix, "forum_0_0.csv"), header_row=True, delimiter="|" - ), - ["title", "creationDate"], - "id", - ), - "place": ( - Loader( - os.path.join(prefix, "place_0_0.csv"), header_row=True, delimiter="|" - ), - ["name", "url", "type"], - "id", - ), "post": ( Loader( os.path.join(prefix, "post_0_0.csv"), header_row=True, delimiter="|" @@ -141,11 +111,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ], "id", ), - "tag": ( - Loader(os.path.join(prefix, "tag_0_0.csv"), header_row=True, delimiter="|"), - ["name", "url"], - "id", - ), } edges = { "replyOf": [ @@ -170,62 +135,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Post.id", "post"), ), ], - "isPartOf": [ - ( - Loader( - os.path.join(prefix, "place_isPartOf_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Place.id", "place"), - ("Place.id.1", "place"), - ) - ], - "isSubclassOf": [ - ( - Loader( - os.path.join(prefix, "tagclass_isSubclassOf_tagclass_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("TagClass.id", "tagclass"), - ("TagClass.id.1", "tagclass"), - ) - ], - "hasTag": [ - ( - Loader( - os.path.join(prefix, "forum_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Tag.id", "tag"), - ), - ( - Loader( - os.path.join(prefix, "comment_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Comment.id", "comment"), - ("Tag.id", "tag"), - ), - ( - Loader( - os.path.join(prefix, "post_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Post.id", "post"), - ("Tag.id", "tag"), - ), - ], "knows": [ ( Loader( @@ -238,84 +147,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Person.id.1", "person"), ) ], - "hasModerator": [ - ( - Loader( - os.path.join(prefix, "forum_hasModerator_person_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Person.id", "person"), - ) - ], - "hasInterest": [ - ( - Loader( - os.path.join(prefix, "person_hasInterest_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Person.id", "person"), - ("Tag.id", "tag"), - ) - ], - "isLocatedIn": [ - ( - Loader( - os.path.join(prefix, "post_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Post.id", "post"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "comment_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Comment.id", "comment"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "organisation_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Organisation.id", "organisation"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "person_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Person.id", "person"), - ("Place.id", "place"), - ), - ], - "hasType": [ - ( - Loader( - os.path.join(prefix, "tag_hasType_tagclass_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Tag.id", "tag"), - ("TagClass.id", "tagclass"), - ) - ], "hasCreator": [ ( Loader( @@ -338,42 +169,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Person.id", "person"), ), ], - "containerOf": [ - ( - Loader( - os.path.join(prefix, "forum_containerOf_post_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Post.id", "post"), - ) - ], - "hasMember": [ - ( - Loader( - os.path.join(prefix, "forum_hasMember_person_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["joinDate"], - ("Forum.id", "forum"), - ("Person.id", "person"), - ) - ], - "workAt": [ - ( - Loader( - os.path.join(prefix, "person_workAt_organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["workFrom"], - ("Person.id", "person"), - ("Organisation.id", "organisation"), - ) - ], "likes": [ ( Loader( @@ -396,17 +191,5 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Post.id", "post"), ), ], - "studyAt": [ - ( - Loader( - os.path.join(prefix, "person_studyAt_organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["classYear"], - ("Person.id", "person"), - ("Organisation.id", "organisation"), - ) - ], } return sess.load_from(edges, vertices, directed, generate_eid=True, retain_oid=True) diff --git a/python/graphscope/tests/unittest/test_graphar.py b/python/graphscope/tests/unittest/test_graphar.py index 6c34a7cd0de5..364f27b056cd 100644 --- a/python/graphscope/tests/unittest/test_graphar.py +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -19,18 +19,23 @@ import os from graphscope.framework.graph import Graph +from graphscope import pagerank graphar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") @pytest.mark.skip(reason="Issue 3162") def test_load_from_graphar(graphscope_session): - graph_yaml = os.path.join( - graphar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" - ) - graph_yaml_path = "graphar+file://" + graph_yaml + # graph_yaml = os.path.join( + # graphar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" + # ) + # graph_yaml_path = "graphar+file://" + graph_yaml + graph_yaml_path = "graphar+file:///tmp/graphar/ldbc_sample.graph.yml" print(graph_yaml_path) g = Graph.load_from(graph_yaml_path, graphscope_session) + # ldbc_simple = g.project(vertices={"person": []}, edges={"knows": []}) + # ctx = pagerank(ldbc_simple) + # print(ctx.to_dataframe({"id": "v.id", "value": "r"})) assert g.schema is not None del g @@ -39,8 +44,8 @@ def test_load_from_graphar(graphscope_session): def test_save_to_graphar(ldbc_graph): graphar_options = { "graph_name": "ldbc_sample", - "file_type": "orc", - "vertex_block_size": 256, - "edge_block_size": 1024, + "file_type": "parquet", + "vertex_block_size": 500, + "edge_block_size": 500, } - ldbc_graph.save_to("/tmp/", format="graphar", graphar_options=graphar_options) + ldbc_graph.save_to("/tmp/graphar/", format="graphar", graphar_options=graphar_options) diff --git a/python/setup.py b/python/setup.py index 44efa5dc197a..c4e75173c64f 100644 --- a/python/setup.py +++ b/python/setup.py @@ -404,7 +404,7 @@ def parse_version(root, **kwargs): package_dir=resolve_graphscope_package_dir(), packages=find_graphscope_packages(), package_data=parsed_package_data(), - ext_modules=build_learning_engine(), + # ext_modules=build_learning_engine(), cmdclass={ "build_ext": BuildGLExt, "build_gltorch_ext": BuildGLTorchExt, From 280eafc89b2dcfa4b3bc7e58336d208cc6e947c0 Mon Sep 17 00:00:00 2001 From: acezen Date: Tue, 24 Oct 2023 03:45:19 +0000 Subject: [PATCH 5/7] Fix the gar loader Committed-by: acezen from Dev container Committed-by: acezen from Dev container --- .../core/fragment/arrow_projected_fragment.h | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 8a1e2f368110..150cd13c4243 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -862,17 +862,17 @@ class ArrowProjectedFragment size_t nbytes = 0; if (fragment->directed()) { vineyard::FixedInt64Builder ie_offsets_begin_builder( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); vineyard::FixedInt64Builder ie_offsets_end_builder( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); std::shared_ptr ie_boffsets_begin_builder; std::shared_ptr ie_boffsets_end_builder; if (COMPACT) { ie_boffsets_begin_builder = std::make_shared( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); ie_boffsets_end_builder = std::make_shared( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); selectEdgeByNeighborLabel( fragment, v_label, fragment->compact_ie_lists_[v_label][e_label]->GetArray(), @@ -914,17 +914,17 @@ class ArrowProjectedFragment oe_boffsets_end; { vineyard::FixedInt64Builder oe_offsets_begin_builder( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); vineyard::FixedInt64Builder oe_offsets_end_builder( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); std::shared_ptr oe_boffsets_begin_builder; std::shared_ptr oe_boffsets_end_builder; if (COMPACT) { oe_boffsets_begin_builder = std::make_shared( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); oe_boffsets_end_builder = std::make_shared( - client, fragment->tvnums_[v_label]); + client, fragment->ivnums_[v_label]); selectEdgeByNeighborLabel( fragment, v_label, fragment->compact_oe_lists_[v_label][e_label]->GetArray(), @@ -1680,7 +1680,6 @@ class ArrowProjectedFragment const std::shared_ptr& nbr_list, const std::shared_ptr& offsets, int64_t* begins, int64_t* ends) { - const int64_t* offset_values = offsets->raw_values(); vineyard::parallel_for( static_cast(0), fragment->tvnums_[v_label], [&](vid_t i) { @@ -1708,7 +1707,7 @@ class ArrowProjectedFragment const int64_t* offset_values = offsets->raw_values(); const int64_t* boffset_values = boffsets->raw_values(); vineyard::parallel_for( - static_cast(0), fragment->tvnums_[v_label], + static_cast(0), fragment->ivnums_[v_label], [&](vid_t i) { int64_t begin = offset_values[i], end = offset_values[i + 1]; int64_t bbegin = boffset_values[i], bend = boffset_values[i + 1]; From bf45634c6dd91067a2b7f8dbdff295cabfa682b5 Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 25 Oct 2023 06:52:05 +0000 Subject: [PATCH 6/7] Test Committed-by: acezen from Dev container Committed-by: acezen from Dev container --- .../core/fragment/arrow_projected_fragment.h | 17 +++++++++-------- .../graphscope/tests/unittest/test_graphar.py | 6 +++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 150cd13c4243..8622453ade99 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -862,17 +862,17 @@ class ArrowProjectedFragment size_t nbytes = 0; if (fragment->directed()) { vineyard::FixedInt64Builder ie_offsets_begin_builder( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); vineyard::FixedInt64Builder ie_offsets_end_builder( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); std::shared_ptr ie_boffsets_begin_builder; std::shared_ptr ie_boffsets_end_builder; if (COMPACT) { ie_boffsets_begin_builder = std::make_shared( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); ie_boffsets_end_builder = std::make_shared( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); selectEdgeByNeighborLabel( fragment, v_label, fragment->compact_ie_lists_[v_label][e_label]->GetArray(), @@ -914,17 +914,17 @@ class ArrowProjectedFragment oe_boffsets_end; { vineyard::FixedInt64Builder oe_offsets_begin_builder( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); vineyard::FixedInt64Builder oe_offsets_end_builder( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); std::shared_ptr oe_boffsets_begin_builder; std::shared_ptr oe_boffsets_end_builder; if (COMPACT) { oe_boffsets_begin_builder = std::make_shared( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); oe_boffsets_end_builder = std::make_shared( - client, fragment->ivnums_[v_label]); + client, fragment->tvnums_[v_label]); selectEdgeByNeighborLabel( fragment, v_label, fragment->compact_oe_lists_[v_label][e_label]->GetArray(), @@ -1680,6 +1680,7 @@ class ArrowProjectedFragment const std::shared_ptr& nbr_list, const std::shared_ptr& offsets, int64_t* begins, int64_t* ends) { + const int64_t* offset_values = offsets->raw_values(); vineyard::parallel_for( static_cast(0), fragment->tvnums_[v_label], [&](vid_t i) { diff --git a/python/graphscope/tests/unittest/test_graphar.py b/python/graphscope/tests/unittest/test_graphar.py index 364f27b056cd..e988e7969547 100644 --- a/python/graphscope/tests/unittest/test_graphar.py +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -33,9 +33,9 @@ def test_load_from_graphar(graphscope_session): graph_yaml_path = "graphar+file:///tmp/graphar/ldbc_sample.graph.yml" print(graph_yaml_path) g = Graph.load_from(graph_yaml_path, graphscope_session) - # ldbc_simple = g.project(vertices={"person": []}, edges={"knows": []}) - # ctx = pagerank(ldbc_simple) - # print(ctx.to_dataframe({"id": "v.id", "value": "r"})) + ldbc_simple = g.project(vertices={"person": []}, edges={"knows": []}) + ctx = pagerank(ldbc_simple) + print(ctx.to_dataframe({"id": "v.id", "value": "r"})) assert g.schema is not None del g From 1ea86b3f959be8c950f8741b1d120a3498bac297 Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 25 Oct 2023 07:13:24 +0000 Subject: [PATCH 7/7] Fix Committed-by: acezen from Dev container --- coordinator/gscoordinator/op_executor.py | 11 +---------- python/graphscope/framework/utils.py | 9 +++++++++ python/graphscope/tests/unittest/test_graphar.py | 2 -- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/coordinator/gscoordinator/op_executor.py b/coordinator/gscoordinator/op_executor.py index b4d8be03a857..6d457f87d304 100644 --- a/coordinator/gscoordinator/op_executor.py +++ b/coordinator/gscoordinator/op_executor.py @@ -760,18 +760,9 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef): "\n" ) storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) -<<<<<<< HEAD + deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode()) vineyard_endpoint = self._launcher.vineyard_endpoint vineyard_ipc_socket = self._launcher.vineyard_socket -======= - deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode()) - engine_config = self.get_analytical_engine_config() - if self._launcher.type() == types_pb2.HOSTS: - vineyard_endpoint = engine_config["vineyard_rpc_endpoint"] - else: - vineyard_endpoint = self._launcher._vineyard_internal_endpoint - vineyard_ipc_socket = engine_config["vineyard_socket"] ->>>>>>> 54ddef33 (Prototype for load_from and write_to) deployment, hosts = self._launcher.get_vineyard_stream_info() path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode() graph_id = vineyard.io.deserialize( diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index b092a350c7f0..07c1c444619a 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -696,6 +696,15 @@ def wrapper(*args, **kwargs): return decorator +def apply_docstring(fn): + """Apply the docstring of `fn` to annotated function.""" + + def decorator(func): + func.__doc__ = fn.__doc__ + return func + + return decorator + def generate_graphar_info_from_schema(path, schema, graphar_options): import copy diff --git a/python/graphscope/tests/unittest/test_graphar.py b/python/graphscope/tests/unittest/test_graphar.py index e988e7969547..7b59f9e60323 100644 --- a/python/graphscope/tests/unittest/test_graphar.py +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -24,7 +24,6 @@ graphar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") -@pytest.mark.skip(reason="Issue 3162") def test_load_from_graphar(graphscope_session): # graph_yaml = os.path.join( # graphar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" @@ -40,7 +39,6 @@ def test_load_from_graphar(graphscope_session): del g -@pytest.mark.skip(reason="Issue 3162") def test_save_to_graphar(ldbc_graph): graphar_options = { "graph_name": "ldbc_sample",