@@ -6,25 +6,16 @@ import org.apache.spark.rdd.RDD
6
6
import org .apache .spark .rdd .RDD .rddToPairRDDFunctions
7
7
import org .apache .spark .sql .SparkSession
8
8
import org .apache .spark .storage .StorageLevel
9
+ import org .slf4j .Logger
9
10
import org .slf4j .LoggerFactory
10
11
12
+ import ch .ninecode .cim .CIMRDD
11
13
import ch .ninecode .model ._
12
14
13
- class Transformers (session : SparkSession , storage_level : StorageLevel ) extends Serializable
15
+ class Transformers (session : SparkSession , storage_level : StorageLevel ) extends CIMRDD with Serializable
14
16
{
15
- val log = LoggerFactory .getLogger (getClass)
16
-
17
- def get (name : String ): RDD [Element ] =
18
- {
19
- val rdds = session.sparkContext.getPersistentRDDs
20
- for (key <- rdds.keys)
21
- {
22
- val rdd = rdds (key)
23
- if (rdd.name == name)
24
- return (rdd.asInstanceOf [RDD [Element ]])
25
- }
26
- return (null )
27
- }
17
+ implicit val spark : SparkSession = session
18
+ implicit val log : Logger = LoggerFactory .getLogger (getClass)
28
19
29
20
/**
30
21
* Handle a join of power transformers and stations with the power transformer ends.
@@ -33,7 +24,7 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
33
24
* @param x - the list of transformers and stations keyed by transformer id, joined with transformer ends
34
25
* @return a list of a pair of transformer name and a four-tuple of transformer, station, high side end with voltage and low side end with voltage
35
26
*/
36
- def addEnds (voltages : Map [String , Double ]) (x : Tuple2 [ String ,Tuple2 [ Tuple2 [ PowerTransformer ,Substation ], Option [Iterable [PowerTransformerEnd ]]]]) =
27
+ def addEnds (voltages : Map [String , Double ]) (x : ( String , (( PowerTransformer , Substation ), Option [Iterable [PowerTransformerEnd ]]))) : List [( String , ( PowerTransformer , Substation , ( PowerTransformerEnd , Double ), ( PowerTransformerEnd , Double )))] =
37
28
{
38
29
val ends = x._2._2 match
39
30
{
@@ -42,17 +33,17 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
42
33
case None =>
43
34
Array [PowerTransformerEnd ]()
44
35
}
45
- if (ends.size > 2 )
36
+ if (ends.length > 2 )
46
37
log.warn (" more than two transformer ends for " + x._1)
47
- val ret = if (ends.size == 0 )
38
+ val ret = if (ends.length == 0 )
48
39
{
49
40
log.error (" no transformer ends for " + x._1)
50
- List [Tuple2 [ String ,Tuple4 [ PowerTransformer ,Substation ,Tuple2 [ PowerTransformerEnd ,Double ], Tuple2 [ PowerTransformerEnd ,Double ]]] ]()
41
+ List [( String , ( PowerTransformer , Substation , ( PowerTransformerEnd , Double ), ( PowerTransformerEnd , Double ))) ]()
51
42
}
52
- else if (ends.size == 1 )
43
+ else if (ends.length == 1 )
53
44
{
54
45
log.error (" less than two transformer ends for " + x._1)
55
- List [Tuple2 [ String ,Tuple4 [ PowerTransformer ,Substation ,Tuple2 [ PowerTransformerEnd ,Double ], Tuple2 [ PowerTransformerEnd ,Double ]]] ]()
46
+ List [( String , ( PowerTransformer , Substation , ( PowerTransformerEnd , Double ), ( PowerTransformerEnd , Double ))) ]()
56
47
}
57
48
else
58
49
{
@@ -72,7 +63,7 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
72
63
* NOTE: Skips transformers with terminals not matching the transformer ends
73
64
* @return a list of four-tuples of transformer, station, and two three-tuples of end, voltage and terminal
74
65
*/
75
- def addTerminals (x : Tuple2 [ String ,Tuple2 [ Tuple4 [ PowerTransformer , Substation , Tuple2 [ PowerTransformerEnd ,Double ], Tuple2 [ PowerTransformerEnd ,Double ]], Option [Iterable [Terminal ]]]]) =
66
+ def addTerminals (x : ( String , (( PowerTransformer , Substation , ( PowerTransformerEnd , Double ), ( PowerTransformerEnd , Double )), Option [Iterable [Terminal ]]))) : List [( PowerTransformer , Substation , ( PowerTransformerEnd , Double , Terminal ), ( PowerTransformerEnd , Double , Terminal ))] =
76
67
{
77
68
val terminals = x._2._2 match
78
69
{
@@ -95,28 +86,28 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
95
86
List ((x._2._1._1, x._2._1._2, (end1._1, end1._2, t1), (end2._1, end2._2, t2)))
96
87
case None =>
97
88
log.error (" terminal not found for " + end2._1.id)
98
- List [Tuple4 [ PowerTransformer ,Substation ,Tuple3 [ PowerTransformerEnd ,Double ,Terminal ], Tuple3 [ PowerTransformerEnd ,Double ,Terminal ]] ]()
89
+ List [( PowerTransformer , Substation , ( PowerTransformerEnd , Double , Terminal ), ( PowerTransformerEnd , Double , Terminal )) ]()
99
90
}
100
91
case None =>
101
92
match1 match
102
93
{
103
- case Some (t1 ) =>
94
+ case Some (_ ) =>
104
95
log.error (" terminal not found for " + end1._1.id)
105
- List [Tuple4 [ PowerTransformer ,Substation ,Tuple3 [ PowerTransformerEnd ,Double ,Terminal ], Tuple3 [ PowerTransformerEnd ,Double ,Terminal ]] ]()
96
+ List [( PowerTransformer , Substation , ( PowerTransformerEnd , Double , Terminal ), ( PowerTransformerEnd , Double , Terminal )) ]()
106
97
case None =>
107
98
log.error (" no terminals not found for " + x._2._1._1.id)
108
- List [Tuple4 [ PowerTransformer ,Substation ,Tuple3 [ PowerTransformerEnd ,Double ,Terminal ], Tuple3 [ PowerTransformerEnd ,Double ,Terminal ]] ]()
99
+ List [( PowerTransformer , Substation , ( PowerTransformerEnd , Double , Terminal ), ( PowerTransformerEnd , Double , Terminal )) ]()
109
100
}
110
101
}
111
102
ret
112
103
}
113
104
114
- def addSC (arg : ((PowerTransformer , Substation , (PowerTransformerEnd , Double , Terminal ), (PowerTransformerEnd , Double , Terminal )), Option [ShortCircuitData ])) =
105
+ def addSC (arg : ((PowerTransformer , Substation , (PowerTransformerEnd , Double , Terminal ), (PowerTransformerEnd , Double , Terminal )), Option [ShortCircuitData ])): ( PowerTransformer , Substation , ( PowerTransformerEnd , Double , Terminal ), ( PowerTransformerEnd , Double , Terminal ), ShortCircuitData ) =
115
106
{
116
107
arg._2 match
117
108
{
118
109
case Some (sc) => (arg._1._1, arg._1._2, arg._1._3, arg._1._4, sc)
119
- case None => (arg._1._1, arg._1._2, arg._1._3, arg._1._4, ShortCircuitData (arg._1._2.id, 200 , - 70 , false ))
110
+ case None => (arg._1._1, arg._1._2, arg._1._3, arg._1._4, ShortCircuitData (arg._1._2.id, 200 , - 70 , valid = false ))
120
111
}
121
112
}
122
113
@@ -133,54 +124,42 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
133
124
}
134
125
135
126
// get all transformers in substations
136
- val transformers = get ( " PowerTransformer " ). asInstanceOf [ RDD [ PowerTransformer ] ]
137
- val substation_transformers = transformers.filter ((t : PowerTransformer ) => { ( t.ConductingEquipment .Equipment .PowerSystemResource .IdentifiedObject .name != " Messen_Steuern" ) } )
127
+ val transformers = get[ PowerTransformer ]
128
+ val substation_transformers = transformers.filter ((t : PowerTransformer ) => t.ConductingEquipment .Equipment .PowerSystemResource .IdentifiedObject .name != " Messen_Steuern" )
138
129
139
130
// get an RDD of substations by filtering out distribution boxes
140
- val stations = get ( " Substation " ). asInstanceOf [ RDD [ Substation ] ].filter (_.EquipmentContainer .ConnectivityNodeContainer .PowerSystemResource .PSRType != " PSRType_DistributionBox" )
131
+ val stations = get[ Substation ].filter (_.EquipmentContainer .ConnectivityNodeContainer .PowerSystemResource .PSRType != " PSRType_DistributionBox" )
141
132
142
133
// the equipment container for a transformer could be a Bay, VoltageLevel or Station... the first two of which have a reference to their station
143
- def station_fn (x : Tuple2 [ String , Any ] ) =
134
+ def station_fn (x : ( String , Any ) ) =
144
135
{
145
136
x match
146
137
{
147
- case (key : String , (t : PowerTransformer , station : Substation )) =>
148
- {
149
- (station.id, t)
150
- }
151
- case (key : String , (t : PowerTransformer , bay : Bay )) =>
152
- {
153
- (bay.Substation , t)
154
- }
155
- case (key : String , (t : PowerTransformer , level : VoltageLevel )) =>
156
- {
157
- (level.Substation , t)
158
- }
159
- case _ =>
160
- {
161
- throw new Exception (" this should never happen -- default case" )
162
- }
138
+ case (_, (t : PowerTransformer , station : Substation )) => (station.id, t)
139
+ case (_, (t : PowerTransformer , bay : Bay )) => (bay.Substation , t)
140
+ case (_, (t : PowerTransformer , level : VoltageLevel )) => (level.Substation , t)
141
+ case _ => throw new Exception (" this should never happen -- default case" )
163
142
}
164
143
}
165
144
166
145
// create an RDD of container-transformer pairs, e.g. { (KAB8526,TRA13730), (STA4551,TRA4425), ... }
167
- val elements = get (" Elements" ). asInstanceOf [ RDD [ Element ]]
146
+ val elements = get[ Element ] (" Elements" )
168
147
val tpairs = substation_transformers.keyBy(_.ConductingEquipment .Equipment .EquipmentContainer ).join (elements.keyBy (_.id)).map (station_fn)
169
148
170
149
// only keep the pairs where the transformer is in a substation we have
171
150
val transformers_stations = tpairs.join (stations.keyBy (_.id)).values
172
151
173
152
// get the transformer ends keyed by transformer
174
- val ends = get ( " PowerTransformerEnd " ). asInstanceOf [ RDD [ PowerTransformerEnd ] ].groupBy (_.PowerTransformer )
153
+ val ends = get[ PowerTransformerEnd ].groupBy (_.PowerTransformer )
175
154
176
155
// get a map of voltages
177
- val voltages = get ( " BaseVoltage " ). asInstanceOf [ RDD [ BaseVoltage ] ].map ((v) => (v.id, v.nominalVoltage)).collectAsMap ()
156
+ val voltages = get[ BaseVoltage ].map ((v) => (v.id, v.nominalVoltage)).collectAsMap ()
178
157
179
158
// attach PowerTransformerEnd elements
180
159
val transformers_stations_plus_ends = transformers_stations.keyBy (_._1.id).leftOuterJoin (ends).flatMap (addEnds (voltages))
181
160
182
161
// get the terminals keyed by transformer
183
- val terms = get ( " Terminal " ). asInstanceOf [ RDD [ Terminal ] ].groupBy (_.ConductingEquipment )
162
+ val terms = get[ Terminal ].groupBy (_.ConductingEquipment )
184
163
185
164
// attach Terminal elements
186
165
val transformers_stations_plus_ends_plus_terminals = transformers_stations_plus_ends.leftOuterJoin (terms).flatMap (addTerminals)
@@ -205,10 +184,11 @@ class Transformers (session: SparkSession, storage_level: StorageLevel) extends
205
184
transformer_data.persist (storage_level)
206
185
session.sparkContext.getCheckpointDir match
207
186
{
208
- case Some (dir ) => transformer_data.checkpoint ()
187
+ case Some (_ ) => transformer_data.checkpoint ()
209
188
case None =>
210
189
}
211
190
212
- return ( transformer_data)
191
+ transformer_data
213
192
}
214
193
}
194
+
0 commit comments