Skip to content

Commit 613c2d8

Browse files
committed
Prototype for load_from and write_to
Signed-off-by: acezen <[email protected]> Add doc Update Update Unify the API Runnable Signed-off-by: acezen <[email protected]> Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Rebase Signed-off-by: acezen <[email protected]> Committed-by: acezen from Dev container Format Committed-by: acezen from Dev container Fix Committed-by: acezen from Dev container Update Committed-by: acezen from Dev container Fix Signed-off-by: acezen <[email protected]> Committed-by: acezen from Dev container Fix Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Committed-by: acezen from Dev container Format Committed-by: acezen from Dev container Committed-by: acezen from Dev container Update Committed-by: acezen from Dev container Committed-by: acezen from Dev container Update
1 parent 9ea468d commit 613c2d8

File tree

10 files changed

+253
-137
lines changed

10 files changed

+253
-137
lines changed

analytical_engine/frame/property_graph_frame.cc

-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
112112
#ifdef ENABLE_GAR
113113
BOOST_LEAF_AUTO(graph_info_path,
114114
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));
115-
BOOST_LEAF_ASSIGN(generate_eid, params.Get<bool>(gs::rpc::GENERATE_EID));
116-
BOOST_LEAF_ASSIGN(retain_oid, params.Get<bool>(gs::rpc::RETAIN_OID));
117115
using loader_t =
118116
vineyard::gar_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
119117
loader_t loader(client, comm_spec, graph_info_path);

coordinator/gscoordinator/op_executor.py

+4
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef):
727727
"\n"
728728
)
729729
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
730+
serialization_options = json.loads(op.attr[types_pb2.SERIALIZATION_OPTIONS].s.decode())
730731
engine_config = self.get_analytical_engine_config()
731732
if self._launcher.type() == types_pb2.HOSTS:
732733
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
@@ -743,6 +744,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef):
743744
vineyard_ipc_socket=vineyard_ipc_socket,
744745
vineyard_endpoint=vineyard_endpoint,
745746
storage_options=storage_options,
747+
serialization_options=serialization_options,
746748
deployment=deployment,
747749
hosts=hosts,
748750
)
@@ -763,6 +765,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef):
763765
"\n"
764766
)
765767
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
768+
deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode())
766769
engine_config = self.get_analytical_engine_config()
767770
if self._launcher.type() == types_pb2.HOSTS:
768771
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
@@ -777,6 +780,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef):
777780
vineyard_ipc_socket=vineyard_ipc_socket,
778781
vineyard_endpoint=vineyard_endpoint,
779782
storage_options=storage_options,
783+
deseralization_options=deseralization_options,
780784
deployment=deployment,
781785
hosts=hosts,
782786
)

python/graphscope/__init__.py

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
from graphscope.framework.errors import *
5151
from graphscope.framework.graph import Graph
5252
from graphscope.framework.graph_builder import load_from
53-
from graphscope.framework.graph_builder import load_from_gar
5453
from graphscope.version import __version__
5554

5655
__doc__ = """

python/graphscope/client/session.py

-7
Original file line numberDiff line numberDiff line change
@@ -1286,13 +1286,6 @@ def load_from(self, *args, **kwargs):
12861286
with default_session(self):
12871287
return graphscope.load_from(*args, **kwargs)
12881288

1289-
def load_from_gar(self, *args, **kwargs):
1290-
"""Load a graph from gar format files within the session.
1291-
See more information in :meth:`graphscope.load_from_gar`.
1292-
"""
1293-
with default_session(self):
1294-
return graphscope.load_from_gar(*args, **kwargs)
1295-
12961289
def _run_on_local(self):
12971290
self._config_params["port"] = None
12981291
self._config_params["vineyard_socket"] = ""

python/graphscope/framework/dag_utils.py

+31-10
Original file line numberDiff line numberDiff line change
@@ -1057,25 +1057,30 @@ def archive_graph(graph, path):
10571057
return op
10581058

10591059

1060-
def save_graph_to(
1061-
graph,
1062-
path: str,
1063-
vineyard_id,
1064-
**kwargs,
1060+
def serialize_graph(
1061+
graph, path: str, storage_options: dict, serialization_options: dict
10651062
):
10661063
"""Serialize graph to the specified location
1064+
The meta and data of graph is dumped to specified location,
1065+
and can be restored by `Graph.load_from` in other sessions.
10671066
1067+
Each worker will write a `path_{worker_id}.meta` file and
1068+
a `path_{worker_id}` file to storage.
10681069
Args:
10691070
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
1070-
path (str): The path to serialize the graph, on each worker.
1071+
path (str): The path to serialize the graph, on each worker, supported
1072+
storages are local, hdfs, oss, s3
10711073
10721074
Returns:
10731075
An op to serialize the graph to a path.
10741076
"""
10751077
config = {
10761078
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
1077-
types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id),
1078-
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
1079+
types_pb2.VINEYARD_ID: utils.i_to_attr(graph._vineyard_id),
1080+
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)),
1081+
types_pb2.SERIALIZATION_OPTIONS: utils.s_to_attr(
1082+
json.dumps(serialization_options)
1083+
),
10791084
}
10801085
op = Operation(
10811086
graph.session_id,
@@ -1087,10 +1092,26 @@ def save_graph_to(
10871092
return op
10881093

10891094

1090-
def load_graph_from(path: str, sess, **kwargs):
1095+
def deserialize_graph(
1096+
path: str, sess, storage_options: dict, deserialization_options: dict
1097+
):
1098+
"""Deserialize graph from the specified location.
1099+
1100+
Args:
1101+
path (str): The path contains the serialization files.
1102+
sess (`graphscope.Session`): The target session
1103+
that the graph will be construct in.
1104+
1105+
Returns:
1106+
`Graph`: A new graph object. Schema and data is supposed to be
1107+
identical with the one that called serialized method.
1108+
"""
10911109
config = {
10921110
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
1093-
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
1111+
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)),
1112+
types_pb2.DESERIALIZATION_OPTIONS: utils.s_to_attr(
1113+
json.dumps(deserialization_options)
1114+
),
10941115
}
10951116
op = Operation(
10961117
sess.session_id,

python/graphscope/framework/graph.py

+86-57
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from typing import List
2727
from typing import Mapping
2828
from typing import Union
29+
from urllib.parse import urlparse
2930

3031
try:
3132
import vineyard
@@ -103,9 +104,6 @@ def save_to(self, path, **kwargs):
103104
def load_from(cls, path, sess, **kwargs):
104105
raise NotImplementedError
105106

106-
def archive(self, path, **kwargs):
107-
raise NotImplementedError
108-
109107
@abstractmethod
110108
def project(self, vertices, edges):
111109
raise NotImplementedError
@@ -433,16 +431,6 @@ def to_dataframe(self, selector, vertex_range=None):
433431
op = dag_utils.graph_to_dataframe(self, selector, vertex_range)
434432
return ResultDAGNode(self, op)
435433

436-
def archive(self, path):
437-
"""Archive the graph to gar format with graph yaml file path.
438-
439-
Args:
440-
path (str): The graph yaml file path describe how to archive the graph.
441-
"""
442-
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
443-
op = dag_utils.archive_graph(self, path)
444-
return ArchivedGraph(self._session, op)
445-
446434
def to_directed(self):
447435
op = dag_utils.to_directed(self)
448436
graph_dag_node = GraphDAGNode(self._session, op)
@@ -1082,49 +1070,101 @@ def _check_unmodified(self):
10821070
self.signature == self._saved_signature, "Graph has been modified!"
10831071
)
10841072

1085-
def save_to(self, path, **kwargs):
1086-
"""Serialize graph to a location.
1087-
The meta and data of graph is dumped to specified location,
1088-
and can be restored by `Graph.load_from` in other sessions.
1089-
1090-
Each worker will write a `path_{worker_id}.meta` file and
1091-
a `path_{worker_id}` file to storage.
1092-
Args:
1093-
path (str): supported storages are local, hdfs, oss, s3
1094-
"""
1095-
1096-
op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs)
1097-
self._session.dag.add_op(op)
1098-
return self._session._wrapper(op)
1073+
@staticmethod
1074+
def _load_from_graphar(path, sess, **kwargs):
1075+
# graphar now only support global vertex map.
1076+
vertex_map = utils.vertex_map_type_to_enum("global")
1077+
config = {
1078+
types_pb2.OID_TYPE: utils.s_to_attr(
1079+
"int64_t"
1080+
), # grahar use vertex index as oid, so it always be int64_t
1081+
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
1082+
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
1083+
types_pb2.IS_FROM_GAR: utils.b_to_attr(True),
1084+
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map),
1085+
types_pb2.COMPACT_EDGES: utils.b_to_attr(False),
1086+
types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path),
1087+
}
1088+
op = dag_utils.create_graph(
1089+
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config
1090+
)
1091+
return sess._wrapper(GraphDAGNode(sess, op))
10991092

11001093
@classmethod
1101-
def load_from(cls, path, sess, **kwargs):
1102-
"""Construct a `Graph` by deserialize from `path`.
1103-
It will read all serialization files, which is dumped by
1104-
`Graph.serialize`.
1105-
If any serialize file doesn't exists or broken, will error out.
1094+
def load_from(cls, uristring, sess=None, **kwargs):
1095+
"""Load a ArrowProperty graph from with a certain data source. The data source
1096+
can be vineyard serialized files or graphar files.
11061097
11071098
Args:
1108-
path (str): Path contains the serialization files.
1109-
sess (`graphscope.Session`): The target session
1110-
that the graph will be construct in
1111-
1099+
uristring (str): URI contains the description of the data source or
1100+
path contains the serialization files,
1101+
example: "graphar+file:///tmp/graphar/xxx"
1102+
sess (`graphscope.Session`): The target session that the graph
1103+
will be construct, if None, use the default session.
1104+
kwargs: Other arguments that will be passed to the data source loader.
11121105
Returns:
1113-
`Graph`: A new graph object. Schema and data is supposed to be
1114-
identical with the one that called serialized method.
1106+
`Graph`: A new graph object.
11151107
"""
1116-
op = dag_utils.load_graph_from(path, sess, **kwargs)
1117-
return sess._wrapper(GraphDAGNode(sess, op))
1108+
from graphscope.client.session import get_default_session
1109+
1110+
if sess is None:
1111+
sess = get_default_session()
1112+
uri = urlparse(uristring)
1113+
if uri.scheme and "+" in uri.scheme:
1114+
source = uri.scheme.split("+")[0]
1115+
path = uri.scheme.split("+")[-1] + "://" + uri.netloc + uri.path
1116+
if source == "graphar":
1117+
return cls._load_from_graphar(path, sess)
1118+
else:
1119+
raise ValueError("Unknown source: %s" % source)
1120+
else:
1121+
# not a uri string, assume it is a path for deserialization
1122+
storage_options = kwargs.pop("storage_options", {})
1123+
deserialization_options = kwargs.pop("deserialization_options", {})
1124+
op = dag_utils.deserialize_graph(
1125+
uristring, sess, storage_options, deserialization_options
1126+
)
1127+
return sess._wrapper(GraphDAGNode(sess, op))
11181128

1119-
def archive(self, path):
1120-
"""Archive graph gar format files base on the graph info.
1121-
The meta and data of graph is dumped to specified location,
1122-
and can be restored by `Graph.deserialize` in other sessions.
1129+
def save_to(
1130+
self,
1131+
path,
1132+
format="serialization",
1133+
**kwargs,
1134+
):
1135+
"""Save graph to specified location with specified format.
11231136
11241137
Args:
1125-
path (str): the graph info file path.
1138+
path (str): the directory path to write graph.
1139+
format (str): the format to write graph, default is "serialization".
1140+
kwargs: Other arguments that will be passed to the data source
1141+
saver.
1142+
1143+
Return (dict): A dict contains the type and uri string of output data.
11261144
"""
1127-
return self._session._wrapper(self._graph_node.archive(path))
1145+
if format == "graphar":
1146+
graphar_options = kwargs.pop("graphar_options", {})
1147+
graph_info_path = utils.generate_graphar_info_from_schema(
1148+
path,
1149+
self._schema,
1150+
graphar_options,
1151+
)
1152+
op = dag_utils.save_to_graphar(self, graph_info_path)
1153+
self._session.dag.add_op(op)
1154+
self._session._wrapper(op)
1155+
return {"type": format, "uri": "graphar+" + graph_info_path}
1156+
elif format == "serialization":
1157+
# serialize graph
1158+
storage_options = kwargs.pop("storage_options", {})
1159+
serialization_options = kwargs.pop("serialization_options", {})
1160+
op = dag_utils.serialize_graph(
1161+
self, path, storage_options, serialization_options
1162+
)
1163+
self._session.dag.add_op(op)
1164+
self._session._wrapper(op)
1165+
return {"type": format, "uri": path}
1166+
else:
1167+
raise ValueError("Unknown format: %s" % format)
11281168

11291169
def add_vertices(
11301170
self, vertices, label="_", properties=None, vid_field: Union[int, str] = 0
@@ -1201,14 +1241,3 @@ def __init__(self, session, op):
12011241
self._op = op
12021242
# add op to dag
12031243
self._session.dag.add_op(self._op)
1204-
1205-
1206-
class ArchivedGraph(DAGNode):
1207-
"""Archived graph node in a DAG"""
1208-
1209-
def __init__(self, session, op):
1210-
super().__init__()
1211-
self._session = session
1212-
self._op = op
1213-
# add op to dag
1214-
self._session.dag.add_op(self._op)

python/graphscope/framework/graph_builder.py

-50
Original file line numberDiff line numberDiff line change
@@ -210,53 +210,3 @@ def load_from(
210210
use_perfect_hash=use_perfect_hash,
211211
)
212212
return graph
213-
214-
215-
def load_from_gar(
216-
graph_info_path: str,
217-
directed=True,
218-
oid_type="int64_t",
219-
vertex_map="global",
220-
compact_edges=False,
221-
use_perfect_hash=False,
222-
) -> Graph:
223-
sess = get_default_session()
224-
oid_type = utils.normalize_data_type_str(oid_type)
225-
if oid_type not in ("int32_t", "int64_t", "std::string"):
226-
raise ValueError("The 'oid_type' can only be int32_t, int64_t or string.")
227-
if compact_edges:
228-
raise ValueError(
229-
"Loading from gar with 'compact_edges' hasn't been supported yet."
230-
)
231-
if use_perfect_hash:
232-
raise ValueError(
233-
"Loading from gar with 'use_perfect_hash' hasn't been supported yet."
234-
)
235-
# generate and add a loader op to dag
236-
vertex_map = utils.vertex_map_type_to_enum(vertex_map)
237-
# construct create graph op
238-
config = {
239-
types_pb2.DIRECTED: utils.b_to_attr(directed),
240-
types_pb2.OID_TYPE: utils.s_to_attr(oid_type),
241-
types_pb2.GENERATE_EID: utils.b_to_attr(False),
242-
types_pb2.RETAIN_OID: utils.b_to_attr(False),
243-
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
244-
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
245-
types_pb2.IS_FROM_GAR: utils.b_to_attr(True),
246-
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map),
247-
types_pb2.COMPACT_EDGES: utils.b_to_attr(compact_edges),
248-
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(use_perfect_hash),
249-
types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(graph_info_path),
250-
}
251-
op = dag_utils.create_graph(
252-
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config
253-
)
254-
graph = sess.g(
255-
op,
256-
oid_type=oid_type,
257-
directed=directed,
258-
vertex_map=vertex_map,
259-
compact_edges=compact_edges,
260-
use_perfect_hash=use_perfect_hash,
261-
)
262-
return graph

0 commit comments

Comments
 (0)