Skip to content

Local scoring (aka Sparkless) using Aardpfark #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b9028c4
Initial draft of PFA based scoring (aka Sparkless)
tovbinm Aug 8, 2018
ced9ee7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
9e986a7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
470f841
Merge branch 'master' into mt/pfa-local
tovbinm Aug 8, 2018
eda18ad
Merge branch 'master' into mt/pfa-local
tovbinm Aug 9, 2018
2320bbf
Merge branch 'master' into mt/pfa-local
tovbinm Aug 10, 2018
f7d690e
Merge branch 'master' into mt/pfa-local
tovbinm Aug 11, 2018
3ddbfa7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 15, 2018
214c181
Merge branch 'master' into mt/pfa-local
tovbinm Aug 17, 2018
e0617bc
Merge branch 'master' into mt/pfa-local
tovbinm Aug 23, 2018
43ccba3
Use official hadrian release
tovbinm Aug 23, 2018
ceab612
update
tovbinm Aug 23, 2018
0bb6172
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
dd45409
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
5d1f8b7
Merge branch 'master' into mt/pfa-local
tovbinm Aug 25, 2018
2053037
refactoring
tovbinm Aug 27, 2018
4d6b711
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
855217b
pfa seems to work
tovbinm Aug 27, 2018
ebacf5e
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
f1f3ce0
make it all work
tovbinm Aug 27, 2018
f621947
Merge branch 'master' of github.com:salesforce/TransmogrifAI into mt/…
tovbinm Aug 27, 2018
87c364e
Merge branch 'mt/pfa-local' of github.com:salesforce/TransmogrifAI in…
tovbinm Aug 27, 2018
645a071
cleanup
tovbinm Aug 27, 2018
fcdaae4
minor cleanups
tovbinm Aug 27, 2018
0021d3c
Merge branch 'master' into mt/pfa-local
tovbinm Aug 27, 2018
0808f78
use json4s cause it's faster
tovbinm Aug 28, 2018
d73baba
cleanup
tovbinm Aug 28, 2018
74d8c26
cleanup2
tovbinm Aug 28, 2018
7ab91f1
Merge branch 'master' into mt/pfa-local
tovbinm Aug 28, 2018
68135c5
Merge branch 'master' into mt/pfa-local
tovbinm Aug 28, 2018
b796b1e
Update build.gradle
tovbinm Aug 29, 2018
7b2cdcf
updated toMap function
tovbinm Aug 29, 2018
67e42e5
revert
tovbinm Aug 29, 2018
aebef60
Merge branch 'mt/pfa-local' of github.com:salesforce/TransmogrifAI in…
tovbinm Aug 29, 2018
377d52f
nicefy
tovbinm Aug 29, 2018
c84c85a
Merge branch 'master' into mt/pfa-local
tovbinm Aug 29, 2018
8ac9daa
Merge branch 'master' of github.com:salesforce/TransmogrifAI into mt/…
tovbinm Aug 30, 2018
05ea9c4
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
4d72d05
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
91e9248
Added comment
tovbinm Aug 30, 2018
903ad95
Merge branch 'master' into mt/pfa-local
tovbinm Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,27 @@ object RichRow {
def getFeatureType[T <: FeatureType](f: TransientFeature)(implicit conv: FeatureTypeSparkConverter[T]): T =
conv.fromSpark(getAny(f.name))

/**
* Converts row to a [[collection.mutable.Map]]
*
* @return a [[collection.mutable.Map]] with row contents
*/
def toMutableMap: collection.mutable.Map[String, Any] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you are saying that row.getValuesMap[Any] should work as well? let me try.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oook, so my function is faster, because getValuesMap calls def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName)) for each value, while my function operates on indices.

val res = collection.mutable.Map.empty[String, Any]
val fields = row.schema.fields
for {i <- 0 until row.size} {
res += fields(i).name -> row(i)
}
res
}

/**
* Converts row to a [[collection.immutable.Map]]
*
* @return a [[collection.immutable.Map]] with row contents
*/
def toMap: Map[String, Any] = toMutableMap.toMap

}

}
17 changes: 17 additions & 0 deletions local/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repositories {
// TODO: remove once Aardpfark release if official
maven { url 'https://jitpack.io' }
}

dependencies {
compile project(':core')
testCompile project(':testkit')

// PFA serialization for Spark models
// TODO: replace with official Aardpfark release when ready
compile "com.github.relateiq:aardpfark:0.1.0-SNAPSHOT"

// Hadrian PFA runtime for JVM
compileOnly "com.opendatagroup:hadrian:0.8.5"
testRuntime "com.opendatagroup:hadrian:0.8.5"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.local

import com.ibm.aardpfark.spark.ml.SparkSupport.toPFA
import com.opendatagroup.hadrian.jvmcompiler.PFAEngine
import com.salesforce.op.features.types.OPVector
import com.salesforce.op.stages.{OpPipelineStage, OpTransformer}
import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams
import com.salesforce.op.utils.json.JsonUtils
import com.salesforce.op.{OpParams, OpWorkflow}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.Vector


/**
* A class for running TransmogrifAI Workflow without Spark.
*
* @param workflow the workflow that you want to run (Note: the workflow should have the resultFeatures set)
*/
class OpWorkflowRunnerLocal(val workflow: OpWorkflow) {

type ScoreFun = Map[String, Any] => Map[String, Any]

/**
* Load the model & prepare a score local function
*
* @param params params to use during scoring
* @return score local function
*/
def score(params: OpParams): ScoreFun = {
require(params.modelLocation.isDefined, "Model location must be set in params")
val model = workflow.loadModel(params.modelLocation.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the standard load method work on spark models that use parquet storage without a spark context?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the spark ml readers require the context explicitly, but I will need to verify, cause they might get/create spark context inside. Do you have a model in mind that I can check against?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe try PCA

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val stagesWithIndex = model.stages.zipWithIndex
val opStages = stagesWithIndex.collect { case (s: OpTransformer, i) => s -> i }
val sparkStages = stagesWithIndex.filterNot(_._1.isInstanceOf[OpTransformer]).collect {
case (s: SparkWrapperParams[_], i) => s.getSparkMlStage().map(_ -> i)
case (s: Transformer, i) => Some(s -> i)
}.flatten.map(v => v._1.asInstanceOf[Transformer] -> v._2)

val pfaStages = sparkStages.map { case (s, i) => toPFA(s, pretty = true) -> i }
val engines = pfaStages.map { case (s, i) => PFAEngine.fromJson(s, multiplicity = 1).head -> i }
val loadedStages = (opStages ++ engines).sortBy(_._2)

row: Map[String, Any] => {
val rowMap = collection.mutable.Map.empty ++ row
val transformedRow = loadedStages.foldLeft(rowMap) { (r, s) =>
s match {
case (s: OpTransformer, _) =>
r += s.asInstanceOf[OpPipelineStage[_]].getOutputFeatureName -> s.transformKeyValue(r.apply)

case (e: PFAEngine[AnyRef, AnyRef], i) =>
val stage = stagesWithIndex.find(_._2 == i).map(_._1.asInstanceOf[OpPipelineStage[_]]).get
val outName = stage.getOutputFeatureName
val inputName = stage.getInputFeatures().collect {
case f if f.isSubtypeOf[OPVector] => f.name
}.head
val vector = r(inputName).asInstanceOf[Vector].toArray
val input = s"""{"$inputName":${vector.mkString("[", ",", "]")}}"""
val res = e.action(e.jsonInput(input)).toString
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MLnick is using json is the most efficient way to call engine action?

r += outName -> JsonUtils.fromString[Map[String, Any]](res).get
}
}
val resultFeatures = model.getResultFeatures().map(_.name)
transformedRow.collect { case r@(k, _) if resultFeatures.contains(k) => r }.toMap
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.local

import java.io.File

import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelsToTry._
import com.salesforce.op.test.{PassengerSparkFixtureTest, TestCommon}
import com.salesforce.op.utils.spark.RichRow._
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.{OpParams, OpWorkflow}
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner


@RunWith(classOf[JUnitRunner])
class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest with TestCommon {

val features = Seq(height, weight, gender, description, age).transmogrify()
val survivedNum = survived.occurs()

val prediction = BinaryClassificationModelSelector.withTrainValidationSplit(
splitter = None, modelTypesToUse = Seq(OpLogisticRegression)
).setInput(survivedNum, features).getOutput()

val workflow = new OpWorkflow().setResultFeatures(prediction, survivedNum).setReader(dataReader)

lazy val model = workflow.train()

lazy val modelLocation = {
val path = new File(tempDir + "/op-runner-local-test-model").toString
model.save(path)
path
}

lazy val rawData = dataReader.generateDataFrame(model.rawFeatures).collect().map(_.toMap)

lazy val expectedScores = model.score().collect(prediction, survivedNum)

// TODO: actually test spark wrapped stage with PFA
Spec(classOf[OpWorkflowRunnerLocal]) should "produce scores without Spark" in {
val params = new OpParams().withValues(modelLocation = Some(modelLocation))
val scoreFn = new OpWorkflowRunnerLocal(workflow).score(params)
val _ = rawData.map(row => scoreFn(row)) // warm up

val numOfRuns = 1000
var elapsed = 0L
for { _ <- 0 until numOfRuns } {
val start = System.currentTimeMillis()
val scores = rawData.map(row => scoreFn(row))
elapsed += System.currentTimeMillis() - start
for {
(score, (predV, survivedV)) <- scores.zip(expectedScores)
expected = Map(
prediction.name -> predV.value,
survivedNum.name -> survivedV.value.get
)
} score shouldBe expected
}
println(s"Scored ${expectedScores.length * numOfRuns} records in ${elapsed}ms")
println(s"Average time per record: ${elapsed.toDouble / (expectedScores.length * numOfRuns)}ms")
}

}
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ enableFeaturePreview('STABLE_PUBLISHING')

rootProject.name='transmogrifai'

include 'utils', 'features', 'readers', 'core', 'models', 'testkit', 'cli', 'templates:simple'
include 'utils', 'features', 'readers', 'core', 'models', 'testkit', 'cli', 'templates:simple', 'local'