Skip to content

Commit c57b03b

Browse files
authored
refactor: fetcher sub package + kill old stats in fetcher (#423)
1 parent ce8ab5f commit c57b03b

29 files changed

+120
-466
lines changed

online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package ai.chronon.online;
1818

19+
import ai.chronon.online.fetcher.Fetcher;
1920
import scala.collection.Seq;
2021
import scala.compat.java8.FutureConverters;
2122
import scala.concurrent.Future;

online/src/main/java/ai/chronon/online/JavaFetcher.java

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package ai.chronon.online;
1818

19-
import ai.chronon.online.Fetcher.Request;
20-
import ai.chronon.online.Fetcher.Response;
19+
import ai.chronon.online.fetcher.Fetcher;
20+
import ai.chronon.online.fetcher.FetcherResponseWithTs;
2121
import scala.collection.Iterator;
2222
import scala.collection.Seq;
2323
import scala.Option;
@@ -120,9 +120,9 @@ public JavaFetcher build() {
120120
}
121121

122122

123-
public static List<JavaResponse> toJavaResponses(Seq<Response> responseSeq) {
123+
public static List<JavaResponse> toJavaResponses(Seq<Fetcher.Response> responseSeq) {
124124
List<JavaResponse> result = new ArrayList<>(responseSeq.size());
125-
Iterator<Response> it = responseSeq.iterator();
125+
Iterator<Fetcher.Response> it = responseSeq.iterator();
126126
while (it.hasNext()) {
127127
result.add(new JavaResponse(it.next()));
128128
}
@@ -139,44 +139,21 @@ private CompletableFuture<List<JavaResponse>> convertResponsesWithTs(Future<Fetc
139139
});
140140
}
141141

142-
public static List<JavaStatsResponse> toJavaStatsResponses(Seq<Fetcher.StatsResponse> responseSeq) {
143-
List<JavaStatsResponse> result = new ArrayList<>(responseSeq.size());
144-
Iterator<Fetcher.StatsResponse> it = responseSeq.iterator();
145-
while(it.hasNext()) {
146-
result.add(toJavaStatsResponse(it.next()));
147-
}
148-
return result;
149-
}
150-
151-
public static JavaStatsResponse toJavaStatsResponse(Fetcher.StatsResponse response) {
152-
return new JavaStatsResponse(response);
153-
}
154-
public static JavaSeriesStatsResponse toJavaSeriesStatsResponse(Fetcher.SeriesStatsResponse response) {
155-
return new JavaSeriesStatsResponse(response);
156-
}
157-
158-
private CompletableFuture<List<JavaStatsResponse>> convertStatsResponses(Future<Seq<Fetcher.StatsResponse>> responses) {
159-
return FutureConverters
160-
.toJava(responses)
161-
.toCompletableFuture()
162-
.thenApply(JavaFetcher::toJavaStatsResponses);
163-
}
164-
165-
private Seq<Request> convertJavaRequestList(List<JavaRequest> requests, boolean isGroupBy, long startTs) {
166-
ArrayBuffer<Request> scalaRequests = new ArrayBuffer<>();
142+
private Seq<Fetcher.Request> convertJavaRequestList(List<JavaRequest> requests, boolean isGroupBy, long startTs) {
143+
ArrayBuffer<Fetcher.Request> scalaRequests = new ArrayBuffer<>();
167144
for (JavaRequest request : requests) {
168-
Request convertedRequest = request.toScalaRequest();
145+
Fetcher.Request convertedRequest = request.toScalaRequest();
169146
scalaRequests.$plus$eq(convertedRequest);
170147
}
171-
Seq<Request> scalaRequestsSeq = scalaRequests.toSeq();
148+
Seq<Fetcher.Request> scalaRequestsSeq = scalaRequests.toSeq();
172149
instrument(requests.stream().map(jReq -> jReq.name).collect(Collectors.toList()), isGroupBy, "java.request_conversion.latency.millis", startTs);
173150
return scalaRequestsSeq;
174151
}
175152

176153
public CompletableFuture<List<JavaResponse>> fetchGroupBys(List<JavaRequest> requests) {
177154
long startTs = System.currentTimeMillis();
178155
// Convert java requests to scala requests
179-
Seq<Request> scalaRequests = convertJavaRequestList(requests, true, startTs);
156+
Seq<Fetcher.Request> scalaRequests = convertJavaRequestList(requests, true, startTs);
180157
// Get responses from the fetcher
181158
Future<FetcherResponseWithTs> scalaResponses = this.fetcher.withTs(this.fetcher.fetchGroupBys(scalaRequests));
182159
// Convert responses to CompletableFuture
@@ -186,7 +163,7 @@ public CompletableFuture<List<JavaResponse>> fetchGroupBys(List<JavaRequest> req
186163
public CompletableFuture<List<JavaResponse>> fetchJoin(List<JavaRequest> requests) {
187164
long startTs = System.currentTimeMillis();
188165
// Convert java requests to scala requests
189-
Seq<Request> scalaRequests = convertJavaRequestList(requests, false, startTs);
166+
Seq<Fetcher.Request> scalaRequests = convertJavaRequestList(requests, false, startTs);
190167
// Get responses from the fetcher
191168
Future<FetcherResponseWithTs> scalaResponses = this.fetcher.withTs(this.fetcher.fetchJoin(scalaRequests, Option.empty()));
192169
// Convert responses to CompletableFuture
@@ -213,10 +190,4 @@ private Metrics.Context getJoinContext(String joinName) {
213190
private Metrics.Context getGroupByContext(String groupByName) {
214191
return new Metrics.Context("group_by.fetch", null, groupByName, null, false, null, null, null, null);
215192
}
216-
217-
public CompletableFuture<JavaSeriesStatsResponse> fetchConsistencyMetricsTimeseries(JavaStatsRequest request) {
218-
Future<Fetcher.SeriesStatsResponse> response = this.fetcher.fetchConsistencyMetricsTimeseries(request.toScalaRequest());
219-
// Convert responses to CompletableFuture
220-
return FutureConverters.toJava(response).toCompletableFuture().thenApply(JavaFetcher::toJavaSeriesStatsResponse);
221-
}
222193
}

online/src/main/java/ai/chronon/online/JavaRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package ai.chronon.online;
1818

19+
import ai.chronon.online.fetcher.Fetcher;
1920
import scala.Option;
2021
import ai.chronon.api.ScalaJavaConversions;
2122

online/src/main/java/ai/chronon/online/JavaResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package ai.chronon.online;
1818

1919
import ai.chronon.api.ScalaJavaConversions;
20+
import ai.chronon.online.fetcher.Fetcher;
2021

2122
import java.util.Map;
2223

online/src/main/java/ai/chronon/online/JavaSeriesStatsResponse.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

online/src/main/java/ai/chronon/online/JavaStatsRequest.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

online/src/main/java/ai/chronon/online/JavaStatsResponse.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

online/src/main/scala/ai/chronon/online/Api.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package ai.chronon.online
1919
import ai.chronon.api.Constants
2020
import ai.chronon.api.StructType
2121
import ai.chronon.online.KVStore._
22+
import ai.chronon.online.fetcher.Fetcher
2223
import org.apache.spark.sql.SparkSession
2324
import org.slf4j.Logger
2425
import org.slf4j.LoggerFactory
@@ -259,15 +260,17 @@ abstract class Api(userConf: Map[String, String]) extends Serializable {
259260
final def buildFetcher(debug: Boolean = false,
260261
callerName: String = null,
261262
disableErrorThrows: Boolean = false): Fetcher =
262-
new Fetcher(genKvStore,
263-
Constants.MetadataDataset,
264-
logFunc = responseConsumer,
265-
debug = debug,
266-
externalSourceRegistry = externalRegistry,
267-
timeoutMillis = timeoutMillis,
268-
callerName = callerName,
269-
flagStore = flagStore,
270-
disableErrorThrows = disableErrorThrows)
263+
new Fetcher(
264+
genKvStore,
265+
Constants.MetadataDataset,
266+
logFunc = responseConsumer,
267+
debug = debug,
268+
externalSourceRegistry = externalRegistry,
269+
timeoutMillis = timeoutMillis,
270+
callerName = callerName,
271+
flagStore = flagStore,
272+
disableErrorThrows = disableErrorThrows
273+
)
271274

272275
final def buildJavaFetcher(callerName: String = null, disableErrorThrows: Boolean = false): JavaFetcher = {
273276
new JavaFetcher(genKvStore,

online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package ai.chronon.online
1818

1919
import ai.chronon.api.Constants
20-
import ai.chronon.online.Fetcher.Request
21-
import ai.chronon.online.Fetcher.Response
20+
import ai.chronon.online.fetcher.Fetcher.Request
21+
import ai.chronon.online.fetcher.Fetcher.Response
2222

2323
import scala.collection.Seq
2424
import scala.collection.mutable

online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import ai.chronon.api.LongType
77
import ai.chronon.api.StringType
88
import ai.chronon.api.StructField
99
import ai.chronon.api.StructType
10-
import ai.chronon.online.Fetcher.Request
10+
import ai.chronon.online.fetcher.Fetcher
1111

1212
import scala.collection.Seq
1313

@@ -64,7 +64,7 @@ object OnlineDerivationUtil {
6464

6565
def applyDeriveFunc(
6666
deriveFunc: DerivationFunc,
67-
request: Request,
67+
request: Fetcher.Request,
6868
baseMap: Map[String, AnyRef]
6969
): Map[String, AnyRef] = {
7070
val requestTs = request.atMillis.getOrElse(System.currentTimeMillis())

0 commit comments

Comments
 (0)