Skip to content

Local scoring (aka Sparkless) using Aardpfark #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b9028c4
Initial draft of PFA based scoring (aka Sparkless)
tovbinm Aug 8, 2018
ced9ee7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
9e986a7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
470f841
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
eda18ad
Merge branch 'master' into mt/pfa-local
tovbinm Aug 9, 2018
2320bbf
Merge branch 'master' into mt/pfa-local
tovbinm Aug 10, 2018
f7d690e
Merge branch 'master' into mt/pfa-local
tovbinm Aug 11, 2018
3ddbfa7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 15, 2018
214c181
Merge branch 'master' into mt/pfa-local
tovbinm Aug 17, 2018
e0617bc
Merge branch 'master' into mt/pfa-local
tovbinm Aug 23, 2018
43ccba3
Use official hadrian release
tovbinm Aug 23, 2018
ceab612
update
tovbinm Aug 23, 2018
0bb6172
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
dd45409
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
5d1f8b7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
2053037
refactoring
tovbinm Aug 27, 2018
4d6b711
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
855217b
pfa seems to work
tovbinm Aug 27, 2018
ebacf5e
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
f1f3ce0
make it all work
tovbinm Aug 27, 2018
f621947
Merge branch 'master' of github.com:salesforce/TransmogrifAI into mt/…
tovbinm Aug 27, 2018
87c364e
Merge branch 'mt/pfa-local' of github.com:salesforce/TransmogrifAI in…
tovbinm Aug 27, 2018
645a071
cleanup
tovbinm Aug 27, 2018
fcdaae4
minor cleanups
tovbinm Aug 27, 2018
0021d3c
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
0808f78
use json4s cause it's faster
tovbinm Aug 28, 2018
d73baba
cleanup
tovbinm Aug 28, 2018
74d8c26
cleanup2
tovbinm Aug 28, 2018
7ab91f1
Merge branch 'master' into mt/pfa-local
tovbinm Aug 28, 2018
68135c5
Merge branch 'master' into mt/pfa-local
tovbinm Aug 28, 2018
b796b1e
Update build.gradle
tovbinm Aug 29, 2018
7b2cdcf
updated toMap function
tovbinm Aug 29, 2018
67e42e5
revert
tovbinm Aug 29, 2018
aebef60
Merge branch 'mt/pfa-local' of github.com:salesforce/TransmogrifAI in…
tovbinm Aug 29, 2018
377d52f
nicefy
tovbinm Aug 29, 2018
c84c85a
Merge branch 'master' into mt/pfa-local
tovbinm Aug 29, 2018
8ac9daa
Merge branch 'master' of github.com:salesforce/TransmogrifAI into mt/…
tovbinm Aug 30, 2018
05ea9c4
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
4d72d05
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
91e9248
Added comment
tovbinm Aug 30, 2018
903ad95
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ configure(allProjs) {
commonsValidatorVersion = '1.6'
commonsIOVersion = '2.6'
scoveragePluginVersion = '1.3.1'
hadrianVersion = '0.8.5'
aardpfarkVersion = '0.1.0-SNAPSHOT'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we pulling in a shapshot?


mainClassName = 'com.salesforce.Main'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,27 @@ object RichRow {
def getFeatureType[T <: FeatureType](f: TransientFeature)(implicit conv: FeatureTypeSparkConverter[T]): T =
conv.fromSpark(getAny(f.name))

/**
* Converts row to a [[collection.mutable.Map]]
*
* @return a [[collection.mutable.Map]] with row contents
*/
def toMutableMap: collection.mutable.Map[String, Any] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you are saying that row.getValuesMap[Any] should work as well? let me try.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oook, so my function is faster, because getValuesMap calls def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName)) for each value, while my function operates on indices.

val res = collection.mutable.Map.empty[String, Any]
val fields = row.schema.fields
for {i <- 0 until row.size} {
res += fields(i).name -> row(i)
}
res
}

/**
* Converts row to a [[collection.immutable.Map]]
*
* @return a [[collection.immutable.Map]] with row contents
*/
def toMap: Map[String, Any] = toMutableMap.toMap

}

}
17 changes: 17 additions & 0 deletions local/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repositories {
// TODO: remove once Aardpfark release if official
maven { url 'https://jitpack.io' }
}

dependencies {
compile project(':core')
testCompile project(':testkit')

// PFA serialization for Spark models
// TODO: replace with official Aardpfark release when ready
compile "com.github.relateiq:aardpfark:$aardpfarkVersion"

// Hadrian PFA runtime for JVM
compileOnly "com.opendatagroup:hadrian:$hadrianVersion"
testRuntime "com.opendatagroup:hadrian:$hadrianVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.local

import com.ibm.aardpfark.spark.ml.SparkSupport
import com.opendatagroup.hadrian.jvmcompiler.PFAEngine
import com.salesforce.op.OpWorkflowModel
import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams
import com.salesforce.op.stages.{OPStage, OpTransformer}
import org.apache.spark.ml.SparkMLSharedParamConstants._
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization

import scala.collection.mutable

/**
* Enrichment for [[OpWorkflowModel]] to allow local scoring functionality
*/
trait OpWorkflowModelLocal {

/**
* Enrichment for [[OpWorkflowModel]] to allow local scoring functionality
*
* @param model [[OpWorkflowModel]]
*/
implicit class RichOpWorkflowModel(model: OpWorkflowModel) {

private implicit val formats = DefaultFormats

/**
* Internal PFA model representation
*
* @param inputs mode inputs mappings
* @param output output mapping
* @param engine PFA engine
*/
private case class PFAModel
(
inputs: Map[String, String],
output: (String, String),
engine: PFAEngine[AnyRef, AnyRef]
)

/**
* Internal OP model representation
*
* @param output output name
* @param model model instance
*/
private case class OPModel(output: String, model: OPStage with OpTransformer)

/**
* Prepares a score function for local scoring
*
* @return score function for local scoring
*/
def scoreFunction: ScoreFunction = {
// Prepare the stages for scoring
val stagesWithIndex = model.stages.zipWithIndex
// Collect all OP stages
val opStages = stagesWithIndex.collect { case (s: OpTransformer, i) => OPModel(s.getOutputFeatureName, s) -> i }
// Collect all Spark wrapped stages
val sparkStages = stagesWithIndex.filterNot(_._1.isInstanceOf[OpTransformer]).collect {
case (s: OPStage with SparkWrapperParams[_], i) if s.getSparkMlStage().isDefined =>
(s, s.getSparkMlStage().get.asInstanceOf[Transformer].copy(ParamMap.empty), i)
}
// Convert Spark wrapped stages into PFA models
val pfaStages = sparkStages.map { case (opStage, sparkStage, i) => toPFAModel(opStage, sparkStage) -> i }
// Combine all stages and apply the original order
val allStages = (opStages ++ pfaStages).sortBy(_._2).map(_._1)
val resultFeatures = model.getResultFeatures().map(_.name).toSet

// Score Function
input: Map[String, Any] => {
val inputMap = mutable.Map.empty ++= input
val transformedRow = allStages.foldLeft(inputMap) {
// For OP Models we simply call transform
case (row, OPModel(output, stage)) =>
row += output -> stage.transformKeyValue(row.apply)

// For PFA Models we execute PFA engine action with json in/out
case (row, PFAModel(inputs, (out, outCol), engine)) =>
val inJson = rowToJson(row, inputs)
val engineIn = engine.jsonInput(inJson)
val engineOut = engine.action(engineIn)
val resMap = parse(engineOut.toString).extract[Map[String, Any]]
row += out -> resMap(outCol)
}
transformedRow.filterKeys(resultFeatures.contains).toMap
}
}

/**
* Convert Spark wrapped staged into PFA Models
*/
private def toPFAModel(opStage: OPStage with SparkWrapperParams[_], sparkStage: Transformer): PFAModel = {
// Update input/output params for Spark stages to default ones
val inParam = sparkStage.getParam(inputCol.name)
val outParam = sparkStage.getParam(outputCol.name)
val inputs = opStage.getInputFeatures().map(_.name).map {
case n if sparkStage.get(inParam).contains(n) => n -> inputCol.name
case n if sparkStage.get(outParam).contains(n) => n -> outputCol.name
case n => n -> n
}.toMap
val output = opStage.getOutputFeatureName
sparkStage.set(inParam, inputCol.name).set(outParam, outputCol.name)
val pfaJson = SparkSupport.toPFA(sparkStage, pretty = true)
val pfaEngine = PFAEngine.fromJson(pfaJson).head
PFAModel(inputs, (output, outputCol.name), pfaEngine)
}

/**
* Convert row of Spark values into a json convertible Map
* See [[FeatureTypeSparkConverter.toSpark]] for all possible values - we invert them here
*/
private def rowToJson(row: mutable.Map[String, Any], inputs: Map[String, String]): String = {
val in = inputs.map { case (k, v) => (v, row.get(k)) }.mapValues {
case Some(v: Vector) => v.toArray
case Some(v: mutable.WrappedArray[_]) => v.toArray(v.elemTag)
case Some(v: Map[_, _]) => v.mapValues {
case v: mutable.WrappedArray[_] => v.toArray(v.elemTag)
case x => x
}
case None | Some(null) => null
case Some(v) => v
}
Serialization.write(in)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.local

import com.salesforce.op.{OpParams, OpWorkflow}


/**
* A class for running TransmogrifAI Workflow without Spark.
*
* @param workflow the workflow that you want to run (Note: the workflow should have the resultFeatures set)
*/
class OpWorkflowRunnerLocal(val workflow: OpWorkflow) {

/**
* Load the model & prepare a score function for local scoring
*
* @param params params to use during scoring
* @return score function for local scoring
*/
def score(params: OpParams): ScoreFunction = {
require(params.modelLocation.isDefined, "Model location must be set in params")
val model = workflow.loadModel(params.modelLocation.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the standard load method work on spark models that use parquet storage without a spark context?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the spark ml readers require the context explicitly, but I will need to verify, cause they might get/create spark context inside. Do you have a model in mind that I can check against?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe try PCA

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

model.scoreFunction
}

}
41 changes: 41 additions & 0 deletions local/src/main/scala/com/salesforce/op/local/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op


package object local extends OpWorkflowModelLocal {

/**
* Score function for local scoring: raw record => transformed record
*/
type ScoreFunction = Map[String, Any] => Map[String, Any]

}
Loading