14
14
* limitations under the License.
15
15
*/
16
16
17
- package ai .chronon .spark .test
17
+ package ai .chronon .spark .test . fetcher
18
18
19
19
import ai .chronon .aggregator .test .Column
20
20
import ai .chronon .aggregator .windowing .TsUtils
21
21
import ai .chronon .api
22
22
import ai .chronon .api .Constants .MetadataDataset
23
- import ai .chronon .api .Extensions .JoinOps
24
- import ai .chronon .api .Extensions .MetadataOps
23
+ import ai .chronon .api .Extensions .{JoinOps , MetadataOps }
25
24
import ai .chronon .api .ScalaJavaConversions ._
26
25
import ai .chronon .api ._
27
- import ai .chronon .online .Fetcher .Request
28
- import ai .chronon .online .Fetcher .Response
29
- import ai .chronon .online .Fetcher .StatsRequest
30
- import ai .chronon .online .FlagStore
31
- import ai .chronon .online .FlagStoreConstants
32
- import ai .chronon .online .JavaRequest
26
+ import ai .chronon .online .Fetcher .{Request , StatsRequest }
33
27
import ai .chronon .online .KVStore .GetRequest
34
- import ai .chronon .online .LoggableResponseBase64
35
- import ai .chronon .online .MetadataDirWalker
36
- import ai .chronon .online .MetadataEndPoint
37
- import ai .chronon .online .MetadataStore
38
- import ai .chronon .online .SparkConversions
28
+ import ai .chronon .online ._
39
29
import ai .chronon .spark .Extensions ._
40
30
import ai .chronon .spark .stats .ConsistencyJob
31
+ import ai .chronon .spark .test .{DataFrameGen , OnlineUtils , SchemaEvolutionUtils }
41
32
import ai .chronon .spark .utils .MockApi
42
33
import ai .chronon .spark .{Join => _ , _ }
43
34
import com .google .gson .GsonBuilder
44
- import org .apache .spark .sql .DataFrame
45
- import org .apache .spark .sql .Row
46
- import org .apache .spark .sql .SparkSession
47
35
import org .apache .spark .sql .catalyst .expressions .GenericRow
48
- import org .apache .spark .sql .functions .avg
49
- import org .apache .spark .sql .functions .col
50
- import org .apache .spark .sql .functions .lit
51
- import org .junit .Assert .assertEquals
52
- import org .junit .Assert .assertFalse
53
- import org .junit .Assert .assertTrue
36
+ import org .apache .spark .sql .functions .{avg , col , lit }
37
+ import org .apache .spark .sql .{Row , SparkSession }
38
+ import org .junit .Assert .{assertEquals , assertFalse , assertTrue }
54
39
import org .scalatest .flatspec .AnyFlatSpec
55
- import org .slf4j .Logger
56
- import org .slf4j .LoggerFactory
40
+ import org .slf4j .{Logger , LoggerFactory }
57
41
58
- import java .lang
59
- import java .util
60
42
import java .util .TimeZone
61
43
import java .util .concurrent .Executors
44
+ import java .{lang , util }
62
45
import scala .collection .Seq
63
- import scala .compat .java8 .FutureConverters
64
- import scala .concurrent .Await
65
- import scala .concurrent .ExecutionContext
66
- import scala .concurrent .Future
67
46
import scala .concurrent .duration .Duration
68
- import scala .concurrent .duration . SECONDS
47
+ import scala .concurrent .{ Await , ExecutionContext , Future }
69
48
import scala .io .Source
70
49
71
50
class FetcherTest extends AnyFlatSpec {
@@ -86,8 +65,8 @@ class FetcherTest extends AnyFlatSpec {
86
65
87
66
val joinPath = " joins/team/example_join.v1"
88
67
val confResource = getClass.getResource(s " / $joinPath" )
68
+ val src = Source .fromResource(joinPath)
89
69
println(s " conf resource path for dir walker: ${confResource.getPath}" )
90
- val src = Source .fromFile(confResource.getPath)
91
70
92
71
val expected = {
93
72
try src.mkString
@@ -785,102 +764,3 @@ class FetcherTest extends AnyFlatSpec {
785
764
assertTrue(responseMap.keys.forall(_.endsWith(" _exception" )))
786
765
}
787
766
}
788
-
789
- object FetcherTestUtil {
790
- @ transient lazy val logger : Logger = LoggerFactory .getLogger(getClass)
791
- def joinResponses (spark : SparkSession ,
792
- requests : Array [Request ],
793
- mockApi : MockApi ,
794
- useJavaFetcher : Boolean = false ,
795
- runCount : Int = 1 ,
796
- samplePercent : Double = - 1 ,
797
- logToHive : Boolean = false ,
798
- debug : Boolean = false )(implicit ec : ExecutionContext ): (List [Response ], DataFrame ) = {
799
- val chunkSize = 100
800
- @ transient lazy val fetcher = mockApi.buildFetcher(debug)
801
- @ transient lazy val javaFetcher = mockApi.buildJavaFetcher()
802
-
803
- def fetchOnce = {
804
- var latencySum : Long = 0
805
- var latencyCount = 0
806
- val blockStart = System .currentTimeMillis()
807
- val result = requests.iterator
808
- .grouped(chunkSize)
809
- .map { oldReqs =>
810
- // deliberately mis-type a few keys
811
- val r = oldReqs
812
- .map(r =>
813
- r.copy(keys = r.keys.mapValues { v =>
814
- if (v.isInstanceOf [java.lang.Long ]) v.toString else v
815
- }.toMap))
816
- val responses = if (useJavaFetcher) {
817
- // Converting to java request and using the toScalaRequest functionality to test conversion
818
- val convertedJavaRequests = r.map(new JavaRequest (_)).toJava
819
- val javaResponse = javaFetcher.fetchJoin(convertedJavaRequests)
820
- FutureConverters
821
- .toScala(javaResponse)
822
- .map(
823
- _.toScala.map(jres =>
824
- Response (
825
- Request (jres.request.name, jres.request.keys.toScala.toMap, Option (jres.request.atMillis)),
826
- jres.values.toScala.map(_.toScala)
827
- )))
828
- } else {
829
- fetcher.fetchJoin(r)
830
- }
831
-
832
- // fix mis-typed keys in the request
833
- val fixedResponses =
834
- responses.map(resps => resps.zip(oldReqs).map { case (resp, req) => resp.copy(request = req) })
835
- System .currentTimeMillis() -> fixedResponses
836
- }
837
- .flatMap { case (start, future) =>
838
- val result = Await .result(future, Duration (10000 , SECONDS )) // todo: change back to millis
839
- val latency = System .currentTimeMillis() - start
840
- latencySum += latency
841
- latencyCount += 1
842
- result
843
- }
844
- .toList
845
- val latencyMillis = latencySum.toFloat / latencyCount.toFloat
846
- val qps = (requests.length * 1000.0 ) / (System .currentTimeMillis() - blockStart).toFloat
847
- (latencyMillis, qps, result)
848
- }
849
-
850
- // to overwhelm the profiler with fetching code path
851
- // so as to make it prominent in the flamegraph & collect enough stats
852
-
853
- var latencySum = 0.0
854
- var qpsSum = 0.0
855
- var loggedValues : Seq [LoggableResponseBase64 ] = null
856
- var result : List [Response ] = null
857
- (0 until runCount).foreach { _ =>
858
- val (latency, qps, resultVal) = fetchOnce
859
- result = resultVal
860
- loggedValues = mockApi.flushLoggedValues
861
- latencySum += latency
862
- qpsSum += qps
863
- }
864
- val fetcherNameString = if (useJavaFetcher) " Java" else " Scala"
865
-
866
- logger.info(s """
867
- |Averaging fetching stats for $fetcherNameString Fetcher over ${requests.length} requests $runCount times
868
- |with batch size: $chunkSize
869
- |average qps: ${qpsSum / runCount}
870
- |average latency: ${latencySum / runCount}
871
- | """ .stripMargin)
872
- val loggedDf = mockApi.loggedValuesToDf(loggedValues, spark)
873
- if (logToHive) {
874
- TableUtils (spark).insertPartitions(
875
- loggedDf,
876
- mockApi.logTable,
877
- partitionColumns = Seq (" ds" , " name" )
878
- )
879
- }
880
- if (samplePercent > 0 ) {
881
- logger.info(s " logged count: ${loggedDf.count()}" )
882
- loggedDf.show()
883
- }
884
- result -> loggedDf
885
- }
886
- }
0 commit comments