Skip to content

Commit cb31456

Browse files
committed
Fix issues in Zpublisher for kinesis and patch Adx records if mmdb is available
1 parent 15502db commit cb31456

File tree

10 files changed

+105
-25
lines changed

10 files changed

+105
-25
lines changed

docker-compose.yml

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ services:
2828
image: "jacamars/zerospike:v1"
2929
environment:
3030
BROKERLIST: "kafka:9092"
31+
STATUSCHANNEL: "kafka://[$$BROKERLIST]&topic=status"
3132
ports:
3233
- "6000:6000"
3334
- "6001:6001"
@@ -39,13 +40,15 @@ services:
3940
- rtb_net
4041
depends_on:
4142
- kafka
42-
command: bash -c "./wait-for-it.sh kafka:9092 --t=120 && sleep 1; ./zerospike"
43+
command: bash -c "./wait-for-it.sh $$BROKERLIST --t=120 && sleep 1; ./zerospike"
4344

4445
bidder:
4546
image: "jacamars/rtb4free:v1"
4647
environment:
4748
GDPR_MODE: "false"
4849

50+
BROKERLIST: "kafka:9092"
51+
4952
BIDSCHANNEL: "$BIDSCHANNEL"
5053
WINSCHANNEL: "$WINSCHANNEL"
5154
REQUESTSCHANNEL: "$REQUESTSCHANNEL"
@@ -62,7 +65,6 @@ services:
6265
PUBSUB: "zerospike"
6366
EXTERNAL: "http://localhost:8080"
6467
ACCOUNTING: "NONE"
65-
S3BUCKET: ""
6668

6769
S3BUCKET: ""
6870
S3REGION: ""
@@ -94,7 +96,7 @@ services:
9496
depends_on:
9597
- kafka
9698
- zerospike
97-
command: bash -c "./wait-for-it.sh kafka:9092 --t=120 && ./wait-for-it.sh zerospike:6000 --t 120 && sleep 1; ./rtb4free"
99+
command: bash -c "./wait-for-it.sh $$BROKERLIST --t=120 && ./wait-for-it.sh zerospike:6000 --t 120 && sleep 1; ./rtb4free"
98100
#command: bash -c "./wait-for-it.sh kafka:9092 --timeout=120 && ./wait-for-it.sh zerospike:6000 --timeout 120 && sleep 1; ./rtb4free-jmx"
99101

100102

docker-pwd.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ services:
2121
image: "jacamars/zerospike:v1"
2222
environment:
2323
BROKERLIST: "kafka:9092"
24-
STATUSCHANNEL: "$STATUSCHANNEL"
24+
2525
#ports:
2626
# - "6000:6000"
2727
#- "6001:6001"
@@ -32,7 +32,7 @@ services:
3232
# - "./cache.db:/cache.db"
3333
depends_on:
3434
- kafka
35-
command: bash -c "sleep 5 && ./wait-for-it.sh kafka:9092 -t 120 && sleep 1; ./zerospike"
35+
command: bash -c "sleep 5 && ./wait-for-it.sh $$BROKERLIST -t 120 && sleep 1; ./zerospike"
3636

3737
bidder:
3838
image: "jacamars/rtb4free:v1"
@@ -58,7 +58,7 @@ services:
5858
depends_on:
5959
- kafka
6060
- zerospike
61-
command: bash -c "sleep 5 && ./wait-for-it.sh kafka:9092 -t 120 && ./wait-for-it.sh zerospike:6000 -t 120 && sleep 1; ./rtb4free"
61+
command: bash -c "sleep 5 && ./wait-for-it.sh $$BROKERLIST -t 120 && ./wait-for-it.sh zerospike:6000 -t 120 && sleep 1; ./rtb4free"
6262
#command: bash -c "sleep 5 && ./wait-for-it.sh kafka:9092 -t 120 && ./wait-for-it.sh zerospike:6000 -t 120 && sleep 1; ./rtb4free-jmx"
6363

6464
simulator:

pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@
140140
<artifactId>jackson-databind</artifactId>
141141
<version>2.9.8</version>
142142
</dependency>
143+
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-cbor -->
144+
<dependency>
145+
<groupId>com.fasterxml.jackson.dataformat</groupId>
146+
<artifactId>jackson-dataformat-cbor</artifactId>
147+
<version>2.9.8</version>
148+
</dependency>
149+
143150

144151
<!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
145152
<dependency>
@@ -192,7 +199,7 @@
192199
<dependency>
193200
<groupId>com.amazonaws</groupId>
194201
<artifactId>aws-java-sdk</artifactId>
195-
<version>1.11.496</version>
202+
<version>1.11.534</version>
196203
</dependency>
197204

198205
<dependency>

src/TestAdx.java

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import java.io.InputStream;
2+
3+
import javax.xml.bind.DatatypeConverter;
4+
5+
import com.fasterxml.jackson.annotation.JsonInclude.Include;
6+
import com.fasterxml.jackson.databind.DeserializationFeature;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.google.common.io.ByteSource;
9+
import com.jacamars.dsp.rtb.blocks.LookingGlass;
10+
import com.jacamars.dsp.rtb.exchanges.adx.AdxBidRequest;
11+
import com.jacamars.dsp.rtb.exchanges.adx.AdxGeoCodes;
12+
import com.jacamars.dsp.rtb.exchanges.adx.DoubleClick;
13+
import com.jacamars.dsp.rtb.pojo.BidRequest;
14+
import com.jacamars.dsp.rtb.tools.GeoPatch;
15+
import com.jacamars.dsp.rtb.tools.IsoTwo2Iso3;
16+
17+
public class TestAdx {
18+
19+
public static ObjectMapper mapper = new ObjectMapper();
20+
static {
21+
mapper.setSerializationInclusion(Include.NON_NULL);
22+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
23+
}
24+
25+
public static String proto = "EhBcq7AjAAlPbwqBCRa5BjSdIgP5IdcybU1vemlsbGEvNS4wIChpUGhvbmU7IENQVSBpUGhvbmUgT1MgMTJfMiBsaWtlIE1hYyBPUyBYKSBBcHBsZVdlYktpdC82MDUuMS4xNSAoS0hUTUwsIGxpa2UgR2Vja28pIE1vYmlsZS8xNUUxNDhaKGh0dHBzOi8vaXR1bmVzLmFwcGxlLmNvbS9hcHAvaWQ0NzU5NzY1NzdiAmVuaggIyw0V7sucPmoICOoBFe7LnD5qCAi2CxXuy5w+aggI1QgVeFcpPWoICPENFUGKIz1y4wEIARDAAhCsAhgyGDIiKBYiHhsaRg8NDjAZEBESExQNDg8QERITFBkaGw0ODxAREhMUGRobRhYyZSqQAe0B7gHyAf8BiwLLAswC1gKeA70D2gPdA+ED5QPmA+kD6gORBJkEngSfBKYEsgS0BLwEvQS+BL8E7wS6BcAF5wXpBf8FiAaMBpEGmQadBp4GqAayBroGvAbABvAG9Ab1BvkGSgsQ9+SR69UBKLDjLWABcNjyk1h5GR/5g+Vv8+F5fcAul0UWIh+oAVDQAQH6AQCiAgEAsAIBuAIBwAIB4gIA8gIEAwUGB3gAoAEBqgEbQ0FFU0VOY0E4MGFsMjhzdkFpWGZMN2JHVVZvyAGQ/v////////8B0gECVyfiAZYBMgk0NzU5NzY1Nzc4AVjc2ANY4tgDWNTYA1j8ngSiASRcq7AjAAlPbwqBCRYBBjSdmGE4Ub4lO5aFkkNNbrmNZwE4/a/CAQpQZXJmZWN0MzY12gEQ8hdRFJxSTyeVO/fXIeMfuxoGaXBob25lQAFIAWIFYXBwbGVqBmlwaG9uZXIECAwQAnj3AoABmwWQAQCYAdAP+AEB+AGAx0S4AsC9qATCAlJcq7AjAAlPbwqBCRYCBjSdrlFeQy+kwi9tbKzC3LT6a0dfV1sXnTYhQgH1Dd310qO+GmLM1xbNUxVtI4V26sTstLSS3bA88NQ4PnUtX2Z4eMJZyAKTP5gDAaoDPgowCgoNcbkHQhVAiqjCCgoND64HQhVAiqjCCgoNj60HQhXbhKjCCgoN8bgHQhXbhKjCEgoNgLMHQhWOh6jCsgM2CAESBmlwaG9uZRoFYXBwbGUiBmlwaG9uZSoECAwQAjAAOPcCQJsFSNAPUAFaCGlwaG9uZSA4ugMCQ0HCAxRwdWItMTIzMjI2NTM5OTQxNzMwMsgDrALaA0pBTnktekpGUFhtQUN2MFlXMFBkN19iN2FucnJ2bGxDcHktMkJyTEtfUmNqUlpvRWduQ1NXZVlJOUM5djRKWlJReERfZzhWTHM1Z+ADAg==";
26+
27+
public static void main(String []args) throws Exception {
28+
/** See crypto file **/
29+
30+
new AdxGeoCodes("@ADXGEO", "data/adxgeo.csv");
31+
new LookingGlass("@ZIPCODES","data/zip_codes_states.csv");
32+
33+
AdxBidRequest.lookingGlass = (AdxGeoCodes) LookingGlass.symbols.get("@ADXGEO");
34+
35+
GeoPatch.getInstance("/home/ben/GeoLite2-City.mmdb");
36+
37+
byte[] data = DatatypeConverter.parseBase64Binary(proto);
38+
InputStream in = ByteSource.wrap(data).openStream();
39+
BidRequest.compileBuiltIns();
40+
AdxBidRequest br = new AdxBidRequest(in);
41+
42+
System.out.println(mapper.writer().withDefaultPrettyPrinter().writeValueAsString(br.root));
43+
}
44+
}

src/com/jacamars/dsp/rtb/bidder/ZPublisher.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ public void runKinesisLogger() {
471471
try {
472472
if ((msg = queue.poll()) != null) {
473473
i = 1;
474-
PutRecordsRequest putRecordsRequest = null;
474+
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
475475
while(msg != null) {
476476
str = serialize(msg);
477477
byte [] bytes = str.getBytes();
@@ -486,6 +486,7 @@ public void runKinesisLogger() {
486486
msg = queue.poll();
487487
}
488488
putRecordsRequest.setRecords(putRecordsRequestEntryList);
489+
putRecordsRequest.setStreamName(kinesis.getStream());
489490
PutRecordsResult putRecordsResult = kinesis.getKinesis().putRecords(putRecordsRequest);
490491
putRecordsRequestEntryList.clear();
491492
}

src/com/jacamars/dsp/rtb/common/Configuration.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -756,13 +756,14 @@ else if (strategy.equalsIgnoreCase("bids"))
756756
requstLogStrategy = REQUEST_STRATEGY_BIDS;
757757
else if (strategy.equalsIgnoreCase("WINS"))
758758
requstLogStrategy = REQUEST_STRATEGY_WINS;
759-
} else {
760-
if (strategy.contains(".") == false) {
761-
int n = Integer.parseInt(strategy);
762-
ExchangeLogLevel.getInstance().setStdLevel(n);
763-
} else {
764-
Double perc = Double.parseDouble(strategy);
765-
ExchangeLogLevel.getInstance().setStdLevel(perc.intValue());
759+
else {
760+
if (strategy.contains(".") == false) {
761+
int n = Integer.parseInt(strategy);
762+
ExchangeLogLevel.getInstance().setStdLevel(n);
763+
} else {
764+
Double perc = Double.parseDouble(strategy);
765+
ExchangeLogLevel.getInstance().setStdLevel(perc.intValue());
766+
}
766767
}
767768
}
768769
/********************************************************************/
@@ -906,8 +907,7 @@ public static String substitute(String address) throws Exception {
906907
while (address.contains("$REASONSCHANNEL"))
907908
address = GetEnvironmentVariable(address, "$REASONSCHANNEL", "kafka://[$BROKERLIST]&topic=reasons");
908909
while (address.contains("$LOGCHANNEL"))
909-
address = GetEnvironmentVariable(address, "$LOGCHANNEL", "kafka://[$BROKERLIST]&topic=reasons");
910-
kafka://[$BROKERLIST]&topic=logs
910+
address = GetEnvironmentVariable(address, "$LOGCHANNEL", "kafka://[$BROKERLIST]&topic=logs");
911911

912912
//////////////////////////////////////////////////////////////////////////////////////////////////////////
913913

src/com/jacamars/dsp/rtb/exchanges/adx/AdxBidRequest.java

+23-5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.jacamars.dsp.rtb.pojo.BidResponse;
3838
import com.jacamars.dsp.rtb.pojo.Impression;
3939
import com.jacamars.dsp.rtb.pojo.Video;
40+
import com.jacamars.dsp.rtb.tools.GeoPatch;
4041

4142
interface Command {
4243
void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode root, Map db, String key) throws Exception;
@@ -49,7 +50,7 @@ public class AdxBidRequest extends BidRequest {
4950

5051
static Map<String, Command> methodMap = new HashMap<String, Command>();
5152

52-
static AdxGeoCodes lookingGlass = (AdxGeoCodes) LookingGlass.symbols.get("@ADXGEO");
53+
public static AdxGeoCodes lookingGlass = (AdxGeoCodes) LookingGlass.symbols.get("@ADXGEO");
5354

5455
static {
5556

@@ -173,15 +174,19 @@ public void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode r
173174
AdxGeoCode item = lookingGlass.query(geoKey);
174175
if (item != null) {
175176
String type = item.type.toLowerCase();
176-
if (type.equals("city") == false) {
177+
if (type.equals("city")) {
177178
LookingGlass cz = (LookingGlass) LookingGlass.symbols.get("@ZIPCODES");
178179

179-
if (cz != null) {
180+
if (cz != null && postal != null) {
180181
String[] parts = (String[]) cz.query(postal);
181182
if (parts != null) {
182183
geo.put("city", parts[3]);
183184
geo.put("state", parts[4]);
184-
geo.put("county", parts[5]);
185+
geo.put("region", parts[5]);
186+
}
187+
} else {
188+
if (GeoPatch.getInstance() != null) {
189+
GeoPatch.getInstance().patch(device);
185190
}
186191
}
187192
} else {
@@ -199,6 +204,15 @@ public void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode r
199204
}
200205
}
201206
geo.put("country", item.iso3);
207+
if (geo.get("city")==null) {
208+
if (GeoPatch.getInstance() != null) {
209+
GeoPatch.getInstance().patch(device);
210+
}
211+
}
212+
}
213+
} else {
214+
if (GeoPatch.getInstance() != null) {
215+
GeoPatch.getInstance().patch(device);
202216
}
203217
}
204218

@@ -939,6 +953,11 @@ protected static String convertToHex(ByteString ip) {
939953
}
940954
return sb.toString();
941955
}
956+
957+
public static void setCrypto(String ekey, String ikey) {
958+
AdxWinObject.encryptionKeyBytes = e_key = javax.xml.bind.DatatypeConverter.parseBase64Binary(ekey);
959+
AdxWinObject.integrityKeyBytes = i_key = javax.xml.bind.DatatypeConverter.parseBase64Binary(ikey);
960+
}
942961

943962
@Override
944963
public void handleConfigExtensions(Map extension) {
@@ -950,7 +969,6 @@ public void handleConfigExtensions(Map extension) {
950969
}
951970

952971
AdxWinObject.encryptionKeyBytes = e_key = javax.xml.bind.DatatypeConverter.parseBase64Binary(ekey);
953-
954972
AdxWinObject.integrityKeyBytes = i_key = javax.xml.bind.DatatypeConverter.parseBase64Binary(ikey);
955973
}
956974

src/com/jacamars/dsp/rtb/services/Zerospike.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception {
7575
String strace = null;
7676
String db = null;
7777

78-
String kafka = "kafka://[$BROKERLIST]&topic=status";
78+
String kafka = "$STATUSCHANNEL";
7979
int i = 0;
8080

8181
while (i < args.length) {
@@ -118,7 +118,14 @@ public static void main(String[] args) throws Exception {
118118

119119
}
120120

121+
System.out.println("KAFKA="+kafka);
121122
kafka = Configuration.substitute(kafka);
123+
System.out.println("AFTER="+kafka);
124+
125+
// Do the final substitutions in case we are embedding variables in the connection string.
126+
kafka = Configuration.substitute(kafka);
127+
128+
System.out.println("FINAL="+kafka);
122129

123130
if (ct == 0) {
124131
String value = Configuration.GetEnvironmentVariable("$THREADS", "$THREADS", "1");

src/com/jacamars/dsp/rtb/tools/GeoPatch.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void patch(JsonNode idev) {
197197
location = response.getLocation();
198198
if (location != null) {
199199
geo.put("lat", location.getLatitude());
200-
geo.put("lon", location.getLatitude());
200+
geo.put("lon", location.getLongitude());
201201
}
202202
}
203203
} catch (Exception error) {

zerospike.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ services:
2020
image: "jacamars/zerospike:v1"
2121
environment:
2222
BROKERLIST: "kafka:9092"
23+
STATUSCHANNEL: "kafka://[$$BROKERLIST]&topic=status"
2324
THREADS: "2"
2425
ports:
2526
- "6000:6000"
2627
- "6001:6001"
2728
- "6002:6002"
28-
command: bash -c "./wait-for-it.sh kafka:9092 --timeout=120 && sleep 1; ./zerospike"
29+
command: bash -c "./wait-for-it.sh $$BROKERLIST --timeout=120 && sleep 1; ./zerospike"

0 commit comments

Comments
 (0)