Skip to content

Commit e5886f5

Browse files
committed
Format
Committed-by: acezen from Dev container
1 parent 21447f7 commit e5886f5

File tree

7 files changed

+74
-118
lines changed

7 files changed

+74
-118
lines changed

python/graphscope/__init__.py

-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
from graphscope.framework.errors import *
5050
from graphscope.framework.graph import Graph
5151
from graphscope.framework.graph_builder import load_from
52-
from graphscope.framework.graph_builder import load_from_gar
5352
from graphscope.version import __version__
5453

5554
__doc__ = """

python/graphscope/client/session.py

-7
Original file line numberDiff line numberDiff line change
@@ -1293,13 +1293,6 @@ def load_from(self, *args, **kwargs):
12931293
with default_session(self):
12941294
return graphscope.load_from(*args, **kwargs)
12951295

1296-
def load_from_gar(self, *args, **kwargs):
1297-
"""Load a graph from gar format files within the session.
1298-
See more information in :meth:`graphscope.load_from_gar`.
1299-
"""
1300-
with default_session(self):
1301-
return graphscope.load_from_gar(*args, **kwargs)
1302-
13031296
def _run_on_local(self):
13041297
self._config_params["port"] = None
13051298
self._config_params["vineyard_socket"] = ""

python/graphscope/framework/dag_utils.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -1077,8 +1077,8 @@ def fetch_gremlin_result(result_set, fetch_type="one"):
10771077
return op
10781078

10791079

1080-
def archive_graph(graph, path):
1081-
"""Archive a graph to gar format with a path.
1080+
def save_to_graphar(graph, path):
1081+
"""Save a graph to graphar format with a path.
10821082
10831083
Args:
10841084
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
@@ -1105,11 +1105,7 @@ def archive_graph(graph, path):
11051105
return op
11061106

11071107

1108-
def serialize_graph(
1109-
graph,
1110-
path: str,
1111-
**kwargs
1112-
):
1108+
def serialize_graph(graph, path: str, **kwargs):
11131109
"""Serialize graph to the specified location
11141110
The meta and data of graph is dumped to specified location,
11151111
and can be restored by `Graph.load_from` in other sessions.

python/graphscope/framework/graph.py

+25-37
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ def save_to(self, path, **kwargs):
103103
def load_from(cls, path, sess, **kwargs):
104104
raise NotImplementedError
105105

106-
def archive(self, path, **kwargs):
107-
raise NotImplementedError
108-
109106
@abstractmethod
110107
def project(self, vertices, edges):
111108
raise NotImplementedError
@@ -428,16 +425,6 @@ def to_dataframe(self, selector, vertex_range=None):
428425
op = dag_utils.graph_to_dataframe(self, selector, vertex_range)
429426
return ResultDAGNode(self, op)
430427

431-
def archive(self, path):
432-
"""Archive the graph to gar format with graph yaml file path.
433-
434-
Args:
435-
path (str): The graph yaml file path describe how to archive the graph.
436-
"""
437-
check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY)
438-
op = dag_utils.archive_graph(self, path)
439-
return ArchivedGraph(self._session, op)
440-
441428
def to_directed(self):
442429
op = dag_utils.to_directed(self)
443430
graph_dag_node = GraphDAGNode(self._session, op)
@@ -1104,6 +1091,7 @@ def load_from(cls, uristring, sess=None):
11041091
`Graph`: A new graph object. Schema and data is supposed to be
11051092
"""
11061093
from graphscope.client.session import get_default_session
1094+
11071095
if sess is None:
11081096
sess = get_default_session()
11091097
uri = urlparse(uristring)
@@ -1116,19 +1104,20 @@ def load_from(cls, uristring, sess=None):
11161104
raise ValueError("Unknown source: %s" % source)
11171105
else:
11181106
# not a uri string, assume it is a path for deserialization
1119-
op = dag_utils.deserialize_graph(path, sess, **kwargs)
1107+
op = dag_utils.deserialize_graph(path, sess)
11201108
return sess._wrapper(GraphDAGNode(sess, op))
11211109

1122-
def save_to(self,
1123-
path,
1124-
format="serialization",
1125-
graphar_graph_name="graph",
1126-
graphar_vertex_block_size=262144, # 2^18
1127-
graphar_edge_block_size=4194304, # 2^22
1128-
graphar_file_format="parquet",
1129-
graphar_version="v1",
1130-
):
1131-
""" Save graph to specified location with specified format.
1110+
def save_to(
1111+
self,
1112+
path,
1113+
format="serialization",
1114+
graphar_graph_name="graph",
1115+
graphar_vertex_block_size=262144, # 2^18
1116+
graphar_edge_block_size=4194304, # 2^22
1117+
graphar_file_format="parquet",
1118+
graphar_version="v1",
1119+
):
1120+
"""Save graph to specified location with specified format.
11321121
11331122
Args:
11341123
path (str): the directory path to write graph.
@@ -1143,12 +1132,21 @@ def save_to(self,
11431132
"""
11441133
if format == "graphar":
11451134
graph_info_path = utils.generate_graphar_info_from_schema(
1146-
path, self._schema, graphar_graph_name, graphar_vertex_block_size, graphar_edge_block_size, graphar_file_format, graphar_version)
1147-
self._session._wrapper(self._graph_node.archive(graph_info_path))
1135+
path,
1136+
self._schema,
1137+
graphar_graph_name,
1138+
graphar_vertex_block_size,
1139+
graphar_edge_block_size,
1140+
graphar_file_format,
1141+
graphar_version,
1142+
)
1143+
op = dag_utils.save_to_graphar(self, graph_info_path)
1144+
self._session.dag.add_op(op)
1145+
self._session._wrapper(op)
11481146
return {"type": format, "uri": "graphar+" + graph_info_path}
11491147
elif format == "serialization":
11501148
# serialize graph
1151-
op = dag_utils.serialize_graph(self, path, **kwargs)
1149+
op = dag_utils.serialize_graph(self, path)
11521150
self._session.dag.add_op(op)
11531151
self._session._wrapper(op)
11541152
return {"type": format, "uri": path}
@@ -1228,13 +1226,3 @@ def __init__(self, session, op):
12281226
self._op = op
12291227
# add op to dag
12301228
self._session.dag.add_op(self._op)
1231-
1232-
1233-
class ArchivedGraph(DAGNode):
1234-
"""Archived graph node in a DAG"""
1235-
1236-
def __init__(self, session, op):
1237-
self._session = session
1238-
self._op = op
1239-
# add op to dag
1240-
self._session.dag.add_op(self._op)

python/graphscope/framework/graph_builder.py

-43
Original file line numberDiff line numberDiff line change
@@ -205,46 +205,3 @@ def load_from(
205205
compact_edges=compact_edges,
206206
)
207207
return graph
208-
209-
210-
def load_from_gar(
211-
graph_info_path: str,
212-
directed=True,
213-
oid_type="int64_t",
214-
vertex_map="global",
215-
compact_edges=False,
216-
) -> Graph:
217-
sess = get_default_session()
218-
oid_type = utils.normalize_data_type_str(oid_type)
219-
if oid_type not in ("int32_t", "int64_t", "std::string"):
220-
raise ValueError("The 'oid_type' can only be int32_t, int64_t or string.")
221-
if compact_edges:
222-
raise ValueError(
223-
"Loading from gar with 'compact_edges' hasn't been supported yet."
224-
)
225-
# generate and add a loader op to dag
226-
vertex_map = utils.vertex_map_type_to_enum(vertex_map)
227-
# construct create graph op
228-
config = {
229-
types_pb2.DIRECTED: utils.b_to_attr(directed),
230-
types_pb2.OID_TYPE: utils.s_to_attr(oid_type),
231-
types_pb2.GENERATE_EID: utils.b_to_attr(False),
232-
types_pb2.RETAIN_OID: utils.b_to_attr(False),
233-
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
234-
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
235-
types_pb2.IS_FROM_GAR: utils.b_to_attr(True),
236-
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map),
237-
types_pb2.COMPACT_EDGES: utils.b_to_attr(compact_edges),
238-
types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(graph_info_path),
239-
}
240-
op = dag_utils.create_graph(
241-
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config
242-
)
243-
graph = sess.g(
244-
op,
245-
oid_type=oid_type,
246-
directed=directed,
247-
vertex_map=vertex_map,
248-
compact_edges=compact_edges,
249-
)
250-
return graph

python/graphscope/framework/utils.py

+39-18
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import socket
2626
import string
2727
import subprocess
28-
import yaml
2928
import tempfile
3029
import threading
3130
import time
@@ -36,6 +35,7 @@
3635
import numpy as np
3736
import pandas as pd
3837
import psutil
38+
import yaml
3939
from google.protobuf.any_pb2 import Any
4040

4141
from graphscope.client.archive import OutArchive
@@ -685,15 +685,10 @@ def _to_numpy_dtype(dtype):
685685

686686

687687
def generate_graphar_info_from_schema(
688-
path,
689-
schema,
690-
graph_name,
691-
vertex_block_size,
692-
edge_block_size,
693-
file_type,
694-
version):
695-
688+
path, schema, graph_name, vertex_block_size, edge_block_size, file_type, version
689+
):
696690
import copy
691+
697692
class Dumper(yaml.Dumper):
698693
def increase_indent(self, flow=False, indentless=False):
699694
return super(Dumper, self).increase_indent(flow, False)
@@ -726,31 +721,53 @@ def PbDataType2InfoType(str):
726721
info["version"] = "gar/{}".format(version)
727722
info["property_groups"] = [{"properties": [], "file_type": file_type}]
728723
for property in schema.get_vertex_properties(vertex_label):
729-
info["property_groups"][0]["properties"].append({"name": property.name, "data_type": PbDataType2InfoType(graph_def_pb2.DataTypePb.Name(property.data_type)), "is_primary": True if property.name == "id" else False})
730-
output_path = os.path.join(path, vertex_label+".vertex.yml")
724+
info["property_groups"][0]["properties"].append(
725+
{
726+
"name": property.name,
727+
"data_type": PbDataType2InfoType(
728+
graph_def_pb2.DataTypePb.Name(property.data_type)
729+
),
730+
"is_primary": True if property.name == "id" else False,
731+
}
732+
)
733+
output_path = os.path.join(path, vertex_label + ".vertex.yml")
731734
with open(output_path, "w") as f:
732735
yaml.dump(info, f, Dumper=Dumper, default_flow_style=False)
733-
graph_info["vertices"].append(vertex_label+".vertex.yml")
736+
graph_info["vertices"].append(vertex_label + ".vertex.yml")
734737
# process edge info
735738
for edge_label in schema.edge_labels:
736739
properties = []
737740
for property in schema.get_edge_properties(edge_label):
738-
properties.append({"name": property.name, "data_type": PbDataType2InfoType(graph_def_pb2.DataTypePb.Name(property.data_type)), "is_primary": False})
741+
properties.append(
742+
{
743+
"name": property.name,
744+
"data_type": PbDataType2InfoType(
745+
graph_def_pb2.DataTypePb.Name(property.data_type)
746+
),
747+
"is_primary": False,
748+
}
749+
)
739750
csr_adj_list = {
740751
"file_type": file_type,
741-
"property_groups": [{"properties": copy.deepcopy(properties), "file_type": file_type}],
752+
"property_groups": [
753+
{"properties": copy.deepcopy(properties), "file_type": file_type}
754+
],
742755
"ordered": True,
743756
"aligned_by": "src",
744757
}
745758
csc_adj_list = {
746759
"file_type": file_type,
747-
"property_groups": [{"properties": copy.deepcopy(properties), "file_type": file_type}],
760+
"property_groups": [
761+
{"properties": copy.deepcopy(properties), "file_type": file_type}
762+
],
748763
"ordered": True,
749764
"aligned_by": "dst",
750765
}
751766
for r in schema.get_relationships(edge_label):
752767
info = dict()
753-
info["prefix"] = "edge/" + r.source + "_" + edge_label + "_" + r.destination + "/"
768+
info["prefix"] = (
769+
"edge/" + r.source + "_" + edge_label + "_" + r.destination + "/"
770+
)
754771
info["edge_label"] = edge_label
755772
info["src_label"] = r.source
756773
info["dst_label"] = r.destination
@@ -759,10 +776,14 @@ def PbDataType2InfoType(str):
759776
info["dst_chunk_size"] = vertex_block_size
760777
info["version"] = "gar/{}".format(version)
761778
info["adj_lists"] = [csr_adj_list, csc_adj_list]
762-
output_path = os.path.join(path, r.source + "_" + edge_label + "_" + r.destination + ".edge.yml")
779+
output_path = os.path.join(
780+
path, r.source + "_" + edge_label + "_" + r.destination + ".edge.yml"
781+
)
763782
with open(output_path, "w") as f:
764783
yaml.dump(info, f, Dumper=Dumper, default_flow_style=False)
765-
graph_info["edges"].append(r.source + "_" + edge_label + "_" + r.destination + ".edge.yml")
784+
graph_info["edges"].append(
785+
r.source + "_" + edge_label + "_" + r.destination + ".edge.yml"
786+
)
766787
graph_info_path = os.path.join(path, graph_name + ".graph.yml")
767788
with open(graph_info_path, "w") as f:
768789
yaml.dump(graph_info, f, Dumper=Dumper, default_flow_style=False)

python/graphscope/tests/unittest/test_gar.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@
1818

1919
import os
2020

21+
from graphscope.framework.graph import Graph
22+
2123
gar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}")
2224

2325

2426
def test_load_from_gar(graphscope_session):
2527
graph_yaml = os.path.join(
2628
gar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml"
2729
)
28-
print(graph_yaml)
29-
graph = graphscope_session.load_from_gar(graph_yaml)
30+
graph_yaml_path = "graphar+file://" + graph_yaml
31+
print(graph_yaml_path)
32+
g = Graph.load_from(graph_yaml_path)
3033
assert graph.schema is not None
3134
del graph
3235

3336

34-
def test_archive_to_gar(ldbc_graph):
35-
graph_yaml = os.path.join(gar_test_repo_dir, "graphar/ldbc/ldbc.graph.yml")
36-
ldbc_graph.archive(graph_yaml)
37+
def test_save_to_graphar(ldbc_graph):
38+
ldbc_graph.save_to("/tmp/")

0 commit comments

Comments
 (0)