Skip to content

Commit f8cd600

Browse files
committed
fix review4: modelcache thrcache as bean
1 parent 8bfea6b commit f8cd600

File tree

6 files changed

+85
-78
lines changed

6 files changed

+85
-78
lines changed

extra/modules/greenbids-real-time-data/src/main/java/org/prebid/server/hooks/modules/greenbids/real/time/data/config/GreenbidsRealTimeDataConfiguration.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.config.GreenbidsRealTimeDataProperties;
1313
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.data.GreenbidsInferenceDataService;
1414
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.FilterService;
15+
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.ModelCache;
1516
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.OnnxModelRunner;
1617
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.OnnxModelRunnerWithThresholds;
18+
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.ThresholdCache;
1719
import org.prebid.server.hooks.modules.greenbids.real.time.data.v1.GreenbidsRealTimeDataModule;
1820
import org.prebid.server.hooks.modules.greenbids.real.time.data.v1.GreenbidsRealTimeDataProcessedAuctionRequestHook;
1921
import org.prebid.server.json.ObjectMapperProvider;
@@ -33,8 +35,10 @@
3335
public class GreenbidsRealTimeDataConfiguration {
3436

3537
@Bean
36-
public GreenbidsInferenceDataService greenbidsInferenceDataService(GreenbidsRealTimeDataProperties properties) {
37-
final ObjectMapper mapper = ObjectMapperProvider.mapper();
38+
public GreenbidsInferenceDataService greenbidsInferenceDataService(
39+
GreenbidsRealTimeDataProperties properties,
40+
ObjectMapper mapper) {
41+
3842
final File database = new File(properties.getGeoLiteCountryPath());
3943
DatabaseReader dbReader;
4044
try {
@@ -49,8 +53,9 @@ public GreenbidsInferenceDataService greenbidsInferenceDataService(GreenbidsReal
4953
GreenbidsRealTimeDataModule greenbidsRealTimeDataModule(
5054
FilterService filterService,
5155
OnnxModelRunnerWithThresholds onnxModelRunnerWithThresholds,
52-
GreenbidsInferenceDataService greenbidsInferenceDataService) {
53-
final ObjectMapper mapper = ObjectMapperProvider.mapper();
56+
GreenbidsInferenceDataService greenbidsInferenceDataService,
57+
ObjectMapper mapper) {
58+
5459
return new GreenbidsRealTimeDataModule(List.of(
5560
new GreenbidsRealTimeDataProcessedAuctionRequestHook(
5661
mapper,
@@ -65,27 +70,51 @@ public FilterService filterService() {
6570
}
6671

6772
@Bean
68-
public OnnxModelRunnerWithThresholds onnxModelRunnerWithThresholds(
69-
GreenbidsRealTimeDataProperties properties, Vertx vertx) {
70-
71-
final Storage storage = StorageOptions.newBuilder()
73+
public Storage storage(GreenbidsRealTimeDataProperties properties) {
74+
return StorageOptions.newBuilder()
7275
.setProjectId(properties.getGoogleCloudGreenbidsProject()).build().getService();
76+
}
7377

78+
@Bean
79+
public ModelCache modelCache(GreenbidsRealTimeDataProperties properties, Vertx vertx, Storage storage) {
7480
final Cache<String, OnnxModelRunner> modelCacheWithExpiration = Caffeine.newBuilder()
7581
.expireAfterWrite(properties.getCacheExpirationMinutes(), TimeUnit.MINUTES)
7682
.build();
7783

84+
return new ModelCache(
85+
storage,
86+
properties.getGcsBucketName(),
87+
modelCacheWithExpiration,
88+
properties.getOnnxModelCacheKeyPrefix(),
89+
vertx
90+
);
91+
}
92+
93+
@Bean
94+
public ThresholdCache thresholdCache(
95+
GreenbidsRealTimeDataProperties properties,
96+
Vertx vertx,
97+
Storage storage,
98+
ObjectMapper mapper) {
99+
78100
final Cache<String, ThrottlingThresholds> thresholdsCacheWithExpiration = Caffeine.newBuilder()
79101
.expireAfterWrite(properties.getCacheExpirationMinutes(), TimeUnit.MINUTES)
80102
.build();
81103

82-
return new OnnxModelRunnerWithThresholds(
83-
modelCacheWithExpiration,
84-
thresholdsCacheWithExpiration,
104+
return new ThresholdCache(
85105
storage,
86106
properties.getGcsBucketName(),
87-
properties.getOnnxModelCacheKeyPrefix(),
107+
mapper,
108+
thresholdsCacheWithExpiration,
88109
properties.getThresholdsCacheKeyPrefix(),
89110
vertx);
90111
}
112+
113+
@Bean
114+
public OnnxModelRunnerWithThresholds onnxModelRunnerWithThresholds(
115+
ModelCache modelCache,
116+
ThresholdCache thresholdCache) {
117+
118+
return new OnnxModelRunnerWithThresholds(modelCache, thresholdCache);
119+
}
91120
}

extra/modules/greenbids-real-time-data/src/main/java/org/prebid/server/hooks/modules/greenbids/real/time/data/model/predictor/ModelCache.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ public class ModelCache {
2020

2121
String gcsBucketName;
2222

23-
String modelPath;
24-
2523
Cache<String, OnnxModelRunner> cache;
2624

2725
Storage storage;
@@ -33,22 +31,20 @@ public class ModelCache {
3331
Vertx vertx;
3432

3533
public ModelCache(
36-
String modelPath,
3734
Storage storage,
3835
String gcsBucketName,
3936
Cache<String, OnnxModelRunner> cache,
4037
String onnxModelCacheKeyPrefix,
4138
Vertx vertx) {
4239
this.gcsBucketName = gcsBucketName;
43-
this.modelPath = modelPath;
4440
this.cache = cache;
4541
this.storage = storage;
4642
this.onnxModelCacheKeyPrefix = onnxModelCacheKeyPrefix;
4743
this.isFetching = new AtomicBoolean(false);
4844
this.vertx = vertx;
4945
}
5046

51-
public Future<OnnxModelRunner> getModelRunner(String pbuid) {
47+
public Future<OnnxModelRunner> getModelRunner(String onnxModelPath, String pbuid) {
5248
final String cacheKey = onnxModelCacheKeyPrefix + pbuid;
5349
final OnnxModelRunner cachedOnnxModelRunner = cache.getIfPresent(cacheKey);
5450

@@ -58,7 +54,7 @@ public Future<OnnxModelRunner> getModelRunner(String pbuid) {
5854

5955
if (isFetching.compareAndSet(false, true)) {
6056
try {
61-
fetchAndCacheModelRunner(cacheKey);
57+
fetchAndCacheModelRunner(onnxModelPath, cacheKey);
6258
} finally {
6359
isFetching.set(false);
6460
}
@@ -67,17 +63,20 @@ public Future<OnnxModelRunner> getModelRunner(String pbuid) {
6763
return Future.failedFuture("ModelRunner fetching in progress");
6864
}
6965

70-
private void fetchAndCacheModelRunner(String cacheKey) {
71-
vertx.executeBlocking(this::getBlob)
72-
.map(this::loadModelRunner)
66+
private void fetchAndCacheModelRunner(String onnxModelPath, String cacheKey) {
67+
vertx.executeBlocking(promise -> {
68+
Blob blob = getBlob(onnxModelPath);
69+
promise.complete(blob);
70+
})
71+
.map(blob -> loadModelRunner((Blob) blob))
7372
.onSuccess(onnxModelRunner -> cache.put(cacheKey, onnxModelRunner))
7473
.onFailure(error -> logger.error("Failed to fetch ONNX model"));
7574
}
7675

77-
private Blob getBlob() {
76+
private Blob getBlob(String onnxModelPath) {
7877
try {
7978
return Optional.ofNullable(storage.get(gcsBucketName))
80-
.map(bucket -> bucket.get(modelPath))
79+
.map(bucket -> bucket.get(onnxModelPath))
8180
.orElseThrow(() -> new PreBidException("Bucket not found: " + gcsBucketName));
8281
} catch (StorageException e) {
8382
throw new PreBidException("Error accessing GCS artefact for model: ", e);

extra/modules/greenbids-real-time-data/src/main/java/org/prebid/server/hooks/modules/greenbids/real/time/data/model/predictor/OnnxModelRunnerWithThresholds.java

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,54 +10,24 @@
1010

1111
public class OnnxModelRunnerWithThresholds {
1212

13-
private final Cache<String, OnnxModelRunner> modelCacheWithExpiration;
14-
private final Cache<String, ThrottlingThresholds> thresholdsCacheWithExpiration;
15-
private final String gcsBucketName;
16-
private final String onnxModelCacheKeyPrefix;
17-
private final String thresholdsCacheKeyPrefix;
18-
private final Storage storage;
19-
private final Vertx vertx;
13+
private final ModelCache modelCache;
14+
private final ThresholdCache thresholdCache;
2015

2116
public OnnxModelRunnerWithThresholds(
22-
Cache<String, OnnxModelRunner> modelCacheWithExpiration,
23-
Cache<String, ThrottlingThresholds> thresholdsCacheWithExpiration,
24-
Storage storage,
25-
String gcsBucketName,
26-
String onnxModelCacheKeyPrefix,
27-
String thresholdsCacheKeyPrefix,
28-
Vertx vertx) {
29-
this.modelCacheWithExpiration = modelCacheWithExpiration;
30-
this.thresholdsCacheWithExpiration = thresholdsCacheWithExpiration;
31-
this.gcsBucketName = gcsBucketName;
32-
this.onnxModelCacheKeyPrefix = onnxModelCacheKeyPrefix;
33-
this.thresholdsCacheKeyPrefix = thresholdsCacheKeyPrefix;
34-
this.storage = storage;
35-
this.vertx = vertx;
17+
ModelCache modelCache,
18+
ThresholdCache thresholdCache) {
19+
this.modelCache = modelCache;
20+
this.thresholdCache = thresholdCache;
3621
}
3722

3823
public Future<OnnxModelRunner> retrieveOnnxModelRunner(Partner partner) {
3924
final String onnxModelPath = "models_pbuid=" + partner.getPbuid() + ".onnx";
40-
final ModelCache modelCache = new ModelCache(
41-
onnxModelPath,
42-
storage,
43-
gcsBucketName,
44-
modelCacheWithExpiration,
45-
onnxModelCacheKeyPrefix,
46-
vertx);
47-
return modelCache.getModelRunner(partner.getPbuid());
25+
return modelCache.getModelRunner(onnxModelPath, partner.getPbuid());
4826
}
4927

50-
public Future<Double> retrieveThreshold(Partner partner, ObjectMapper mapper) {
28+
public Future<Double> retrieveThreshold(Partner partner) {
5129
final String thresholdJsonPath = "thresholds_pbuid=" + partner.getPbuid() + ".json";
52-
final ThresholdCache thresholdCache = new ThresholdCache(
53-
thresholdJsonPath,
54-
storage,
55-
gcsBucketName,
56-
mapper,
57-
thresholdsCacheWithExpiration,
58-
thresholdsCacheKeyPrefix,
59-
vertx);
60-
return thresholdCache.getThrottlingThresholds(partner.getPbuid())
30+
return thresholdCache.getThrottlingThresholds(thresholdJsonPath, partner.getPbuid())
6131
.map(partner::getThreshold);
6232
}
6333
}

extra/modules/greenbids-real-time-data/src/main/java/org/prebid/server/hooks/modules/greenbids/real/time/data/model/predictor/ThresholdCache.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ public class ThresholdCache {
2323

2424
String gcsBucketName;
2525

26-
String thresholdPath;
27-
2826
Cache<String, ThrottlingThresholds> cache;
2927

3028
Storage storage;
@@ -38,15 +36,13 @@ public class ThresholdCache {
3836
Vertx vertx;
3937

4038
public ThresholdCache(
41-
String thresholdPath,
4239
Storage storage,
4340
String gcsBucketName,
4441
ObjectMapper mapper,
4542
Cache<String, ThrottlingThresholds> cache,
4643
String thresholdsCacheKeyPrefix,
4744
Vertx vertx) {
4845
this.gcsBucketName = gcsBucketName;
49-
this.thresholdPath = thresholdPath;
5046
this.cache = cache;
5147
this.storage = storage;
5248
this.mapper = mapper;
@@ -55,7 +51,7 @@ public ThresholdCache(
5551
this.vertx = vertx;
5652
}
5753

58-
public Future<ThrottlingThresholds> getThrottlingThresholds(String pbuid) {
54+
public Future<ThrottlingThresholds> getThrottlingThresholds(String thresholdJsonPath, String pbuid) {
5955
final String cacheKey = thresholdsCacheKeyPrefix + pbuid;
6056
final ThrottlingThresholds cachedThrottlingThresholds = cache.getIfPresent(cacheKey);
6157

@@ -65,7 +61,7 @@ public Future<ThrottlingThresholds> getThrottlingThresholds(String pbuid) {
6561

6662
if (isFetching.compareAndSet(false, true)) {
6763
try {
68-
fetchAndCacheThrottlingThresholds(cacheKey);
64+
fetchAndCacheThrottlingThresholds(thresholdJsonPath, cacheKey);
6965
} finally {
7066
isFetching.set(false);
7167
}
@@ -74,17 +70,20 @@ public Future<ThrottlingThresholds> getThrottlingThresholds(String pbuid) {
7470
return Future.failedFuture("ThrottlingThresholds fetching in progress");
7571
}
7672

77-
private void fetchAndCacheThrottlingThresholds(String cacheKey) {
78-
vertx.executeBlocking(this::getBlob)
79-
.map(this::loadThrottlingThresholds)
73+
private void fetchAndCacheThrottlingThresholds(String thresholdJsonPath, String cacheKey) {
74+
vertx.executeBlocking(promise -> {
75+
Blob blob = getBlob(thresholdJsonPath);
76+
promise.complete(blob);
77+
})
78+
.map(blob -> loadThrottlingThresholds((Blob) blob))
8079
.onSuccess(thresholds -> cache.put(cacheKey, thresholds))
81-
.onFailure(error -> logger.error("Failed to fetch thresholds"));
80+
.onFailure(error -> logger.error("Failed to fetch thresholds"));;
8281
}
8382

84-
private Blob getBlob() {
83+
private Blob getBlob(String thresholdJsonPath) {
8584
try {
8685
return Optional.ofNullable(storage.get(gcsBucketName))
87-
.map(bucket -> bucket.get(thresholdPath))
86+
.map(bucket -> bucket.get(thresholdJsonPath))
8887
.orElseThrow(() -> new PreBidException("Bucket not found: " + gcsBucketName));
8988
} catch (StorageException e) {
9089
throw new PreBidException("Error accessing GCS artefact for threshold: ", e);

extra/modules/greenbids-real-time-data/src/main/java/org/prebid/server/hooks/modules/greenbids/real/time/data/v1/GreenbidsRealTimeDataProcessedAuctionRequestHook.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public Future<InvocationResult<AuctionRequestPayload>> call(
8181

8282
return Future.join(
8383
onnxModelRunnerWithThresholds.retrieveOnnxModelRunner(partner),
84-
onnxModelRunnerWithThresholds.retrieveThreshold(partner, mapper))
84+
onnxModelRunnerWithThresholds.retrieveThreshold(partner))
8585
.compose(compositeFuture -> toInvocationResult(
8686
bidRequest,
8787
partner,

extra/modules/greenbids-real-time-data/src/test/java/org/prebid/server/hooks/modules/greenbids/real/time/data/v1/GreenbidsRealTimeDataProcessedAuctionRequestHookTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.prebid.server.hooks.modules.greenbids.real.time.data.core.ThrottlingThresholds;
2525
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.data.GreenbidsInferenceDataService;
2626
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.FilterService;
27+
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.ModelCache;
2728
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.OnnxModelRunner;
2829
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.OnnxModelRunnerWithThresholds;
30+
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.predictor.ThresholdCache;
2931
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.result.AnalyticsResult;
3032
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.result.ExplorationResult;
3133
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.result.Ortb2ImpExtResult;
@@ -84,14 +86,22 @@ public void setUp() throws IOException {
8486
final File database = new File("src/test/resources/GeoLite2-Country.mmdb");
8587
final DatabaseReader dbReader = new DatabaseReader.Builder(database).build();
8688
final FilterService filterService = new FilterService();
87-
final OnnxModelRunnerWithThresholds onnxModelRunnerWithThresholds = new OnnxModelRunnerWithThresholds(
88-
modelCacheWithExpiration,
89-
thresholdsCacheWithExpiration,
89+
final ModelCache modelCache = new ModelCache(
9090
storage,
9191
"test_bucket",
92+
modelCacheWithExpiration,
9293
"onnxModelRunner_",
94+
Vertx.vertx());
95+
final ThresholdCache thresholdCache = new ThresholdCache(
96+
storage,
97+
"test_bucket",
98+
mapper,
99+
thresholdsCacheWithExpiration,
93100
"throttlingThresholds_",
94101
Vertx.vertx());
102+
final OnnxModelRunnerWithThresholds onnxModelRunnerWithThresholds = new OnnxModelRunnerWithThresholds(
103+
modelCache,
104+
thresholdCache);
95105
final GreenbidsInferenceDataService greenbidsInferenceDataService = new GreenbidsInferenceDataService(
96106
dbReader,
97107
mapper);

0 commit comments

Comments
 (0)