Skip to content

Commit 9a2bf3a

Browse files
committed
WIP
1 parent 6ba45d9 commit 9a2bf3a

File tree

4 files changed

+92
-9
lines changed

4 files changed

+92
-9
lines changed

online/src/main/java/ai/chronon/online/JavaFetcher.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import scala.collection.mutable.ArrayBuffer;
2525
import scala.compat.java8.FutureConverters;
2626
import scala.concurrent.Future;
27+
import scala.concurrent.ExecutionContext;
2728

2829
import java.util.ArrayList;
2930
import java.util.List;
@@ -35,15 +36,83 @@ public class JavaFetcher {
3536
Fetcher fetcher;
3637

3738
public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry, String callerName, Boolean disableErrorThrows) {
38-
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, null, disableErrorThrows);
39+
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, null, disableErrorThrows, null);
3940
}
4041

4142
public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry) {
42-
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, null, null, false);
43+
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, null, null, false, null);
4344
}
4445

4546
public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry, String callerName, FlagStore flagStore, Boolean disableErrorThrows) {
46-
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, flagStore, disableErrorThrows);
47+
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, flagStore, disableErrorThrows, null);
48+
}
49+
50+
/* user builder pattern to create JavaFetcher
51+
example way to create the java fetcher
52+
JavaFetcher fetcher = new JavaFetcher.Builder(kvStore, metaDataSet, timeoutMillis, logFunc, registry)
53+
.callerName(callerName)
54+
.flagStore(flagStore)
55+
.disableErrorThrows(disableErrorThrows)
56+
.build();
57+
*/
58+
private JavaFetcher(Builder builder) {
59+
this.fetcher = new Fetcher(builder.kvStore,
60+
builder.metaDataSet,
61+
builder.timeoutMillis,
62+
builder.logFunc,
63+
builder.debug,
64+
builder.registry,
65+
builder.callerName,
66+
builder.flagStore,
67+
builder.disableErrorThrows,
68+
builder.executionContextOverride);
69+
}
70+
71+
public static class Builder {
72+
private KVStore kvStore;
73+
private String metaDataSet;
74+
private Long timeoutMillis;
75+
private Consumer<LoggableResponse> logFunc;
76+
private ExternalSourceRegistry registry;
77+
private String callerName;
78+
private Boolean debug;
79+
private FlagStore flagStore;
80+
private Boolean disableErrorThrows;
81+
private ExecutionContext executionContextOverride;
82+
83+
public Builder(KVStore kvStore, String metaDataSet, Long timeoutMillis,
84+
Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry) {
85+
this.kvStore = kvStore;
86+
this.metaDataSet = metaDataSet;
87+
this.timeoutMillis = timeoutMillis;
88+
this.logFunc = logFunc;
89+
this.registry = registry;
90+
}
91+
public Builder callerName(String callerName) {
92+
this.callerName = callerName;
93+
return this;
94+
}
95+
public Builder flagStore(FlagStore flagStore) {
96+
this.flagStore = flagStore;
97+
return this;
98+
}
99+
public Builder disableErrorThrows(Boolean disableErrorThrows) {
100+
this.disableErrorThrows = disableErrorThrows;
101+
return this;
102+
}
103+
public Builder debug(Boolean debug) {
104+
this.debug = debug;
105+
return this;
106+
}
107+
108+
public Builder executionContextOverride(ExecutionContext executionContextOverride) {
109+
this.executionContextOverride = executionContextOverride;
110+
return this;
111+
}
112+
113+
public JavaFetcher build() {
114+
return new JavaFetcher(this);
115+
}
47116
}
48117

49118

online/src/main/scala/ai/chronon/online/Fetcher.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import scala.collection.JavaConverters._
4343
import scala.collection.Seq
4444
import scala.collection.mutable
4545
import scala.collection.mutable.ListBuffer
46+
import scala.concurrent.ExecutionContext
4647
import scala.concurrent.Future
4748
import scala.util.Failure
4849
import scala.util.Success
@@ -95,8 +96,15 @@ class Fetcher(val kvStore: KVStore,
9596
val externalSourceRegistry: ExternalSourceRegistry = null,
9697
callerName: String = null,
9798
flagStore: FlagStore = null,
98-
disableErrorThrows: Boolean = false)
99-
extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug, flagStore, disableErrorThrows) {
99+
disableErrorThrows: Boolean = false,
100+
executionContextOverride: ExecutionContext = null)
101+
extends FetcherBase(kvStore,
102+
metaDataSet,
103+
timeoutMillis,
104+
debug,
105+
flagStore,
106+
disableErrorThrows,
107+
executionContextOverride) {
100108

101109
private def reportCallerNameFetcherVersion(): Unit = {
102110
val message = s"CallerName: ${Option(callerName).getOrElse("N/A")}, FetcherVersion: ${BuildInfo.version}"

online/src/main/scala/ai/chronon/online/FetcherBase.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import com.google.gson.Gson
4444
import java.util
4545
import scala.collection.JavaConverters._
4646
import scala.collection.Seq
47+
import scala.concurrent.ExecutionContext
4748
import scala.concurrent.Future
4849
import scala.util.Failure
4950
import scala.util.Success
@@ -58,8 +59,9 @@ class FetcherBase(kvStore: KVStore,
5859
timeoutMillis: Long = 10000,
5960
debug: Boolean = false,
6061
flagStore: FlagStore = null,
61-
disableErrorThrows: Boolean = false)
62-
extends MetadataStore(kvStore, metaDataSet, timeoutMillis)
62+
disableErrorThrows: Boolean = false,
63+
executionContextOverride: ExecutionContext = null)
64+
extends MetadataStore(kvStore, metaDataSet, timeoutMillis, executionContextOverride)
6365
with FetcherCache {
6466
import FetcherBase._
6567

online/src/main/scala/ai/chronon/online/MetadataStore.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ import scala.util.Try
4141
// [timestamp -> {metric name -> metric value}]
4242
case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])
4343

44-
class MetadataStore(kvStore: KVStore, val dataset: String = MetadataDataset, timeoutMillis: Long) {
44+
class MetadataStore(kvStore: KVStore,
45+
val dataset: String = MetadataDataset,
46+
timeoutMillis: Long,
47+
executionContextOverride: ExecutionContext = null) {
4548
@transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
4649
private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis)
4750
private val CONF_BATCH_SIZE = 50
@@ -56,7 +59,8 @@ class MetadataStore(kvStore: KVStore, val dataset: String = MetadataDataset, tim
5659
partitionSpec = PartitionSpec(format = format, spanMillis = partitionSpec.spanMillis)
5760
}
5861

59-
implicit val executionContext: ExecutionContext = kvStore.executionContext
62+
implicit val executionContext: ExecutionContext =
63+
Option(executionContextOverride).getOrElse(FlexibleExecutionContext.buildExecutionContext)
6064

6165
def getConf[T <: TBase[_, _]: Manifest](confPathOrName: String): Try[T] = {
6266
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]

0 commit comments

Comments
 (0)