Skip to content

Commit 570867f

Browse files
committed
fix CIM load options
1 parent 47d9e94 commit 570867f

File tree

8 files changed

+253
-39
lines changed

8 files changed

+253
-39
lines changed

CIMConnector/src/main/java/ch/ninecode/cim/connector/CIMConnectionRequestInfo.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,32 @@ public int hashCode ()
7777
public String toString ()
7878
{
7979
StringBuilder sb = new StringBuilder ();
80-
81-
sb.append ("[@");
80+
sb.append ("[master: ");
8281
sb.append (getMaster ());
83-
sb.append (" + ");
82+
sb.append (" + cassandra: ");
8483
sb.append (getCassandra ());
85-
sb.append (": ");
86-
for (String key: getProperties ().keySet ())
84+
if (0 != getJars ().size ())
8785
{
88-
sb.append (key);
89-
sb.append ("=");
90-
sb.append (getProperties ().get (key));
86+
sb.append (" properties (");
87+
for (String key: getProperties ().keySet ())
88+
{
89+
sb.append (key);
90+
sb.append ("=");
91+
sb.append (getProperties ().get (key));
92+
sb.append (" ");
93+
}
94+
sb.setLength (sb.length () - 1);
95+
sb.append (")");
9196
}
9297
if (0 != getJars ().size ())
9398
{
94-
sb.append (" (");
99+
sb.append (" jars (");
95100
for (String jar: getJars ())
96101
{
97102
sb.append (jar);
103+
sb.append (" ");
98104
}
105+
sb.setLength (sb.length () - 1);
99106
sb.append (")");
100107
}
101108
sb.append ("]");

CIMConnector/src/main/java/ch/ninecode/cim/connector/CIMManagedConnection.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,19 @@ public void connect (Subject subject, ConnectionRequestInfo info)
204204
if ((null != cassandra) && !cassandra.equals (""))
205205
configuration.set ("spark.cassandra.connection.host", cassandra);
206206

207+
if (null != System.getProperty ("SPARK_HOME"))
208+
System.setProperty ("spark.home", System.getProperty ("SPARK_HOME"));
209+
if (null != System.getProperty ("HADOOP_HOME"))
210+
{
211+
// ToDo: read from conf/spark-defaults.conf
212+
System.setProperty ("spark.driver.extraLibraryPath", System.getProperty ("HADOOP_HOME") + "/lib/native");
213+
System.setProperty ("spark.executor.extraLibraryPath", System.getProperty ("HADOOP_HOME") + "/lib/native");
214+
}
215+
configuration.set ("spark.sql.warehouse.dir", "file:/tmp/spark-warehouse");
216+
// need hive jars too:
217+
// configuration.set ("spark.sql.catalogImplementation", "hive");
218+
configuration.set ("spark.submit.deployMode", "client");
219+
207220
// add the other properties
208221
for (String key : _RequestInfo.getProperties ().keySet ())
209222
configuration.set (key, _RequestInfo.getProperties ().get (key));
@@ -223,9 +236,6 @@ public void connect (Subject subject, ConnectionRequestInfo info)
223236
jars[size] = j2ee;
224237
configuration.setJars (jars);
225238

226-
if (null != logger)
227-
logger.println ("SparkConf = " + configuration.toDebugString ());
228-
229239
// so far, it only works for Spark standalone (as above with master set to spark://sandbox:7077
230240
// here are some options I tried for Yarn access master set to "yarn-client" that didn't work
231241
// configuration.setMaster ("yarn-client"); // assumes a resource manager is specified in yarn-site.xml, e.g. sandbox:8032
@@ -235,6 +245,9 @@ public void connect (Subject subject, ConnectionRequestInfo info)
235245
// register CIMReader classes
236246
configuration.registerKryoClasses (CIMClasses.list ());
237247

248+
if (null != logger)
249+
logger.println ("SparkConf:\n" + configuration.toDebugString ());
250+
238251
// setting spark.executor.memory as a property of SparkConf doesn't work:
239252
if (null != _RequestInfo.getProperties ().get ("spark.executor.memory"))
240253
System.setProperty ("spark.executor.memory", _RequestInfo.getProperties ().get ("spark.executor.memory"));

CIMWeb/src/main/scala/ch/ninecode/cim/cimweb/LoadCIMFileFunction.scala

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@ package ch.ninecode.cim.cimweb
33
import javax.json.Json
44
import javax.json.JsonStructure
55

6+
import ch.ninecode.cim.CIMEdges
7+
import ch.ninecode.cim.CIMJoin
8+
import ch.ninecode.cim.CIMNetworkTopologyProcessor
69
import org.apache.hadoop.fs.Path
710
import org.apache.spark.sql.SparkSession
11+
import org.apache.spark.storage.StorageLevel
812

913
import scala.collection.mutable.HashMap
1014

@@ -24,25 +28,75 @@ case class LoadCIMFileFunction (paths: Array[String], options: Iterable[(String,
2428
for (f <- files)
2529
ff.add (f)
2630
response.add ("files", ff)
27-
val reader_options = new HashMap[String, String] ()
28-
if (null != options)
29-
reader_options ++= options
30-
else
31+
32+
// establish default options if needed
33+
val op = if (null == options)
3134
{
32-
reader_options.put ("StorageLevel", "MEMORY_AND_DISK_SER")
33-
reader_options.put ("ch.ninecode.cim.make_edges", "false")
34-
reader_options.put ("ch.ninecode.cim.do_join", "false")
35-
reader_options.put ("ch.ninecode.cim.do_topo", "false")
36-
reader_options.put ("ch.ninecode.cim.do_topo_islands", "false")
37-
reader_options.put ("ch.ninecode.cim.do_deduplication", if (1 < files.length) "true" else "false")
35+
List (
36+
("StorageLevel", "MEMORY_AND_DISK_SER"),
37+
("ch.ninecode.cim.do_about", "false"),
38+
("ch.ninecode.cim.do_normalize", "false"),
39+
("ch.ninecode.cim.do_deduplication", if (1 < files.length) "true" else "false"),
40+
("ch.ninecode.cim.make_edges", "false"),
41+
("ch.ninecode.cim.do_join", "false"),
42+
("ch.ninecode.cim.do_topo_islands", "false"),
43+
("ch.ninecode.cim.do_topo", "false"),
44+
("ch.ninecode.cim.split_maxsize", "67108864")
45+
)
3846
}
47+
else
48+
options
49+
50+
// echo settings to the response
3951
val opts = Json.createObjectBuilder
40-
for (pair <- reader_options)
52+
for (pair <- op)
4153
opts.add (pair._1, pair._2)
4254
response.add ("options", opts)
55+
56+
// there is a problem (infinite loop) if post processing is done in the CIMReader
57+
// so we extract out topo, edge, and join processing
58+
var topo = false
59+
var isld = false
60+
var join = false
61+
var edge = false
62+
val reader_options = new HashMap[String, String] ()
63+
for (option op)
64+
option._1 match
65+
{
66+
case "ch.ninecode.cim.do_topo"
67+
topo = isld || (try { option._2.toBoolean } catch { case _: Throwable => false })
68+
case "ch.ninecode.cim.do_topo_islands"
69+
isld = try { option._2.toBoolean } catch { case _: Throwable => false }
70+
topo = topo || isld
71+
case "ch.ninecode.cim.do_join"
72+
join = try { option._2.toBoolean } catch { case _: Throwable => false }
73+
case "ch.ninecode.cim.make_edges"
74+
edge = try { option._2.toBoolean } catch { case _: Throwable => false }
75+
case _
76+
reader_options.put (option._1, option._2)
77+
}
4378
reader_options.put ("path", files.mkString (",")) // ToDo: why is this still needed?
79+
4480
val elements = spark.read.format ("ch.ninecode.cim").options (reader_options).load (files:_*)
45-
val count = elements.count
81+
var count = elements.count
82+
if (topo)
83+
{
84+
val ntp = new CIMNetworkTopologyProcessor (spark, org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER, true)
85+
val elements2 = ntp.process (isld)
86+
count = elements2.count
87+
}
88+
if (join)
89+
{
90+
val join = new CIMJoin (spark, StorageLevel.fromString ("MEMORY_AND_DISK_SER"))
91+
val elements3 = join.do_join ()
92+
count = elements3.count
93+
}
94+
if (edge)
95+
{
96+
val edges = new CIMEdges (spark, StorageLevel.fromString ("MEMORY_AND_DISK_SER"))
97+
val elements4 = edges.make_edges (topo)
98+
count = elements4.count
99+
}
46100
response.add ("elements", count)
47101
}
48102
catch

CIMWeb/src/main/scala/ch/ninecode/cim/cimweb/LoadFile.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ class LoadFile extends RESTful
3434
@Produces (Array (MediaType.APPLICATION_JSON))
3535
def getFile (
3636
@PathParam ("path") path: String,
37+
@DefaultValue ("false") @MatrixParam ("do_about") do_about: String,
38+
@DefaultValue ("false") @MatrixParam ("do_normalize") do_normalize: String,
3739
@DefaultValue ("false") @MatrixParam ("do_deduplication") do_deduplication: String,
3840
@DefaultValue ("false") @MatrixParam ("make_edges") make_edges: String,
3941
@DefaultValue ("false") @MatrixParam ("do_join") do_join: String,
40-
@DefaultValue ("false") @MatrixParam ("do_topo") do_topo: String,
4142
@DefaultValue ("false") @MatrixParam ("do_topo_islands") do_topo_islands: String,
43+
@DefaultValue ("false") @MatrixParam ("do_topo") do_topo: String,
44+
@DefaultValue ("67108864") @MatrixParam ("split_maxsize") split_maxsize: String,
4245
@DefaultValue ("false") @MatrixParam ("header") header: String,
4346
@DefaultValue ("false") @MatrixParam ("ignoreLeadingWhiteSpace") ignoreLeadingWhiteSpace: String,
4447
@DefaultValue ("false") @MatrixParam ("ignoreTrailingWhiteSpace") ignoreTrailingWhiteSpace: String,
@@ -73,11 +76,14 @@ class LoadFile extends RESTful
7376
val function = filetype match
7477
{
7578
case "CIM" // see https://github.com/derrickoswald/CIMReader#reader-api
79+
options.put ("ch.ninecode.cim.do_about", do_about)
80+
options.put ("ch.ninecode.cim.do_normalize", do_normalize)
7681
options.put ("ch.ninecode.cim.do_deduplication", do_deduplication)
7782
options.put ("ch.ninecode.cim.make_edges", make_edges)
7883
options.put ("ch.ninecode.cim.do_join", do_join)
79-
options.put ("ch.ninecode.cim.do_topo", do_topo)
8084
options.put ("ch.ninecode.cim.do_topo_islands", do_topo_islands)
85+
options.put ("ch.ninecode.cim.do_topo", do_topo)
86+
options.put ("ch.ninecode.cim.split_maxsize", split_maxsize)
8187
LoadCIMFileFunction (files, options)
8288
case "CSV" // see https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader
8389
options.put ("header", header)

CIMWeb/src/main/webapp/index.html

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,51 @@
5353
<a href="#" class="dropdown-toggle" data-toggle="dropdown" aria-expanded="false">Load <span class="caret"></span></a>
5454
<ul class="dropdown-menu" role="menu">
5555
<li class="dropdown-header">CIM</li>
56+
<li>
57+
<div class="select">
58+
<label for="storage_level">
59+
<select id='storage_level' class='form-control' name='storage_level'>
60+
<option value="NONE">NONE</option>
61+
<option value="DISK_ONLY">DISK_ONLY</option>
62+
<option value="DISK_ONLY_2">DISK_ONLY_2</option>
63+
<option value="MEMORY_ONLY">MEMORY_ONLY</option>
64+
<option value="MEMORY_ONLY_2">MEMORY_ONLY_2</option>
65+
<option value="MEMORY_ONLY_SER">MEMORY_ONLY_SER</option>
66+
<option value="MEMORY_ONLY_SER_2">MEMORY_ONLY_SER_2</option>
67+
<option value="MEMORY_AND_DISK">MEMORY_AND_DISK</option>
68+
<option value="MEMORY_AND_DISK_2">MEMORY_AND_DISK_2</option>
69+
<option value="MEMORY_AND_DISK_SER" selected>MEMORY_AND_DISK_SER</option>
70+
<option value="MEMORY_AND_DISK_SER_2">MEMORY_AND_DISK_SER_2</option>
71+
<option value="OFF_HEAP">OFF_HEAP</option>
72+
</select>
73+
StorageLevel
74+
</label>
75+
</div>
76+
</li>
77+
<li>
78+
<div class="checkbox">
79+
<label for="do_about">
80+
<input id="do_about" type="checkbox"/>
81+
Merge rdf:about
82+
</label>
83+
</div>
84+
</li>
85+
<li>
86+
<div class="checkbox">
87+
<label for="do_normalize">
88+
<input id="do_normalize" type="checkbox"/>
89+
Normalize data
90+
</label>
91+
</div>
92+
</li>
93+
<li>
94+
<div class="checkbox">
95+
<label for="do_deduplication">
96+
<input id="do_deduplication" type="checkbox"/>
97+
Deduplicate
98+
</label>
99+
</div>
100+
</li>
56101
<li>
57102
<div class="checkbox">
58103
<label for="make_edges">
@@ -63,9 +108,9 @@
63108
</li>
64109
<li>
65110
<div class="checkbox">
66-
<label for="do_topo">
67-
<input id="do_topo" type="checkbox"/>
68-
Make topology
111+
<label for="do_join">
112+
<input id="do_join" type="checkbox"/>
113+
Join NIS-ISU
69114
</label>
70115
</div>
71116
</li>
@@ -77,6 +122,22 @@
77122
</label>
78123
</div>
79124
</li>
125+
<li>
126+
<div class="checkbox">
127+
<label for="do_topo">
128+
<input id="do_topo" type="checkbox"/>
129+
Make topology
130+
</label>
131+
</div>
132+
</li>
133+
<li>
134+
<div class="input">
135+
<label for="split_maxsize">
136+
<input id="split_maxsize" type="text" class="form-control" placeholder="split size (bytes)" style="margin-left: 15px; width: 120px;" value="67108864">
137+
</label>
138+
Split size
139+
</div>div>
140+
</li>
80141
<li role="separator" class="divider"></li>
81142
<li class="dropdown-header">CSV</li>
82143
<li>

0 commit comments

Comments
 (0)