Skip to content

Commit 15502db

Browse files
committed
Add stuff
1 parent 19a185f commit 15502db

File tree

12 files changed

+312
-27
lines changed

12 files changed

+312
-27
lines changed

Campaigns/docker.json

+19-7
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,19 @@
2727
"adminPort" : "$ADMINPORT",
2828
"adminSSL" : false,
2929
"password" : "startrekisbetterthanstarwars",
30+
3031
"zeromq" : {
32+
"bidchannel" : "$BIDSCHANNEL",
33+
"winchannel" : "$WINSCHANNEL",
34+
"requests" : "$REQUESTSCHANNEL",
35+
"clicks" : "$CLICKSCHANNEL",
36+
"pixels" : "$PIXELSCHANNEL",
37+
"videoevents": "$VIDEOEVENTSCHANNEL",
38+
"postbackevents": "$POSTBACKEVENTSCHANNEL",
39+
"status" : "$STATUSCHANNEL",
40+
"reasons" : "$REASONSCHANNEL",
41+
42+
3143
"bidchannel" : "kafka://[$BROKERLIST]&topic=bids",
3244
"winchannel" : "kafka://[$BROKERLIST]&topic=wins",
3345
"requests" : "kafka://[$BROKERLIST]&topic=requests",
@@ -49,7 +61,7 @@
4961
"template" : {
5062
"default" : "{creative_forward_url}",
5163
"exchange" : {
52-
"adx" : "<a href='$EXTERNAL/rtb/win/{pub_id}/%%WINNING_PRICE%%/{lat}/{lon}/{ad_id}/{creative_id}/{bid_id}'}'></a><a href='%%CLICK_URL_UNESC%%{redirect_url}></a>{creative_forward_url}",
64+
"adx" : "{creative_forward_url}",
5365
"mopub" : "<a href='mopub template here' </a>",
5466
"mobclix" : "<a href='mobclix template here' </a>",
5567
"nexage" : "<a href='{redirect_url}/exchange={pub}/ad_id={ad_id}/creative_id={creative_id}/price=${AUCTION_PRICE}/lat={lat}/lon={lon}/bid_id={bid_id}?url={creative_forward_url}'><img src='{creative_image_url}' height='{creative_ad_height}' width='{creative_ad_width}'></a><img src='{pixel_url}/exchange={pub}/ad_id={ad_id}/creative_id={creative_id}/{bid_id}/price=${AUCTION_PRICE}/lat={lat}/lon={lon}/bid_id={bid_id}' height='1' width='1'>",
@@ -97,7 +109,7 @@
97109
}, {
98110
"name" : "adx",
99111
"id" : "adx-seat-id",
100-
"bid" : "/rtb/bids/adx=com.jacamars.dsp.rtb.exchanges.adx.DoubleClick",
112+
"bid" : "/rtb/bids/adx=com.jacamars.dsp.rtb.exchanges.adx.DoubleClick&usesPiggyBackWins",
101113
"extension" : {
102114
"e_key" : "$ADX_EKEY",
103115
"i_key" : "$ADX_IKEY"
@@ -251,12 +263,12 @@
251263
"name" : "bidswitch",
252264
"id" : "$BIDSWITCH_ID",
253265
"bid" : "/rtb/bids/bidswitch=com.jacamars.dsp.rtb.exchanges.bidswitch.Bidswitch&!usesEncodedAdm"
254-
}],
266+
}],
255267
"s3" : {
256-
"access_key_id" : "$S3ACCESSKEY",
257-
"secret_access_key" : "$S3SECRETKEY",
258-
"region" : "$S3REGION",
259-
"bucket" : "$SEBUCKET"
268+
"access_key_id" : "$AWSACCESSKEY",
269+
"secret_access_key" : "$AWSSECRETKEY",
270+
"region" : "$AWSREGION",
271+
"bucket" : "$S3BUCKET"
260272
},
261273
"lists" : [ {
262274
"filename" : "data/adxgeo.csv",

Campaigns/payday.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,9 @@
266266
"bid" : "/rtb/bids/bidswitch=com.jacamars.dsp.rtb.exchanges.bidswitch.Bidswitch&!usesEncodedAdm"
267267
}],
268268
"s3" : {
269-
"access_key_id" : "$S3ACCESSKEY",
270-
"secret_access_key" : "$S3SECRETKEY",
271-
"region" : "$S3REGION",
269+
"access_key_id" : "$AWSACCESSKEY",
270+
"secret_access_key" : "$AWSSECRETKEY",
271+
"region" : "$AWSREGION",
272272
"bucket" : "$S3BUCKET"
273273
},
274274
"lists" : [ {

docker-compose.yml

+20
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,34 @@ services:
4545
image: "jacamars/rtb4free:v1"
4646
environment:
4747
GDPR_MODE: "false"
48+
49+
BIDSCHANNEL: "$BIDSCHANNEL"
50+
WINSCHANNEL: "$WINSCHANNEL"
51+
REQUESTSCHANNEL: "$REQUESTSCHANNEL"
52+
CLICKSCHANNEL: "$CLICKSCHANNEL"
53+
PIXELSCHANNEL: "$PIXELSCHANNEL"
54+
VIDEOEVENTSCHANNEL: "$VIDEOEVENTSCHANNEL"
55+
POSTBACKEVENTSCHANNEL: "$POSTBACKEVENTSCHANNEL"
56+
STATUSCHANNEL: "$STATUSCHANNEL"
57+
REASONSCHANNEL: "$REASONSCHANNEL"
58+
LOGCHANNEL: "$LOGCHANNEL"
59+
4860
BROKERLIST: "kafka:9092"
61+
4962
PUBSUB: "zerospike"
5063
EXTERNAL: "http://localhost:8080"
5164
ACCOUNTING: "NONE"
65+
S3BUCKET: ""
66+
5267
S3BUCKET: ""
5368
S3REGION: ""
5469
S3SECRETKEY: ""
5570
S3ACCESSKEY: ""
71+
72+
AWSACCESSKEY: ""
73+
AWSSECRETKEY: ""
74+
AWSREGION: ""
75+
5676
GOOGLE_EKEY: ""
5777
GOOGLE_IKEY: ""
5878
OPENX_EKEY: ""

docker-pwd.yml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ services:
2121
image: "jacamars/zerospike:v1"
2222
environment:
2323
BROKERLIST: "kafka:9092"
24+
STATUSCHANNEL: "$STATUSCHANNEL"
2425
#ports:
2526
# - "6000:6000"
2627
#- "6001:6001"

log4j.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1
1717

1818
log4j.appender.ZEROMQ_APPENDER=com.jacamars.dsp.rtb.tools.ZPublisher4J
1919
log4j.appender.ZEROMQ_APPENDER.layout=org.apache.log4j.PatternLayout
20-
log4j.appender.ZEROMQ_APPENDER.publisher=kafka://[$BROKERLIST]&topic=logs
20+
log4j.appender.ZEROMQ_APPENDER.publisher=$LOGCHANNEL

src/com/jacamars/dsp/rtb/bidder/ZPublisher.java

+79
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import java.io.File;
55
import java.io.FileOutputStream;
66
import java.io.PrintWriter;
7+
import java.nio.ByteBuffer;
78
import java.text.DecimalFormat;
89
import java.text.SimpleDateFormat;
10+
import java.util.ArrayList;
911
import java.util.Date;
1012
import java.util.HashMap;
13+
import java.util.List;
1114
import java.util.Map;
1215
import java.util.concurrent.ConcurrentLinkedQueue;
1316
import java.util.concurrent.TimeUnit;
@@ -20,6 +23,9 @@
2023

2124
import com.jacamars.dsp.rtb.common.Configuration;
2225
import com.jacamars.dsp.rtb.common.HttpPostGet;
26+
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
27+
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
28+
import com.amazonaws.services.kinesis.model.PutRecordsResult;
2329
import com.fasterxml.jackson.annotation.JsonInclude.Include;
2430

2531
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -92,6 +98,7 @@ public class ZPublisher implements Runnable, Callback {
9298

9399
boolean isPipe = false;
94100
PrintWriter pipe;
101+
KinesisConfig kinesis;
95102

96103
/**
97104
* Default constructor
@@ -137,6 +144,9 @@ public ZPublisher(String address) throws Exception {
137144
mapper.setSerializationInclusion(Include.NON_NULL);
138145
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
139146

147+
if (address.startsWith("kinesis://")) {
148+
doKineses(address);
149+
} else
140150
if (address.startsWith("kafka://")) {
141151
doKafka(address);
142152
} else
@@ -207,6 +217,10 @@ public ZPublisher(String address) throws Exception {
207217
if (producer == null && !isPipe)
208218
ping = new Pinger(this);
209219
}
220+
221+
void doKineses(String address) throws Exception {
222+
kinesis = new KinesisConfig(address);
223+
}
210224

211225
// kafka://[a:b,b:c]&topic=bids&partition=0
212226
void doKafka(String saddress) throws Exception {
@@ -423,6 +437,9 @@ public void run() {
423437
runPipeLogger();
424438
}
425439

440+
if (kinesis != null) { // kinesis
441+
runKinesisLogger();
442+
}
426443

427444
if (producer != null) // kafka
428445
runKafkaLogger();
@@ -441,6 +458,68 @@ public void run() {
441458
error.printStackTrace();
442459
}
443460
}
461+
462+
/**
463+
* Run the kineses logger in a loop
464+
*/
465+
public void runKinesisLogger() {
466+
Object msg = null;
467+
String str = null;
468+
int i;
469+
List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
470+
while (!me.isInterrupted()) {
471+
try {
472+
if ((msg = queue.poll()) != null) {
473+
i = 1;
474+
PutRecordsRequest putRecordsRequest = null;
475+
while(msg != null) {
476+
str = serialize(msg);
477+
byte [] bytes = str.getBytes();
478+
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
479+
putRecordsRequestEntry.setPartitionKey(kinesis.getPartition());
480+
putRecordsRequestEntry.setData(ByteBuffer.wrap(bytes));
481+
putRecordsRequestEntryList.add(putRecordsRequestEntry);
482+
483+
if (i++ == 100)
484+
msg = null;
485+
else
486+
msg = queue.poll();
487+
}
488+
putRecordsRequest.setRecords(putRecordsRequestEntryList);
489+
PutRecordsResult putRecordsResult = kinesis.getKinesis().putRecords(putRecordsRequest);
490+
putRecordsRequestEntryList.clear();
491+
}
492+
Thread.sleep(1);
493+
494+
495+
/*while ((msg = queue.poll()) != null) {
496+
str = serialize(msg);
497+
var bytes = str.getBytes();
498+
PutRecordRequest putRecord = new PutRecordRequest();
499+
putRecord.setStreamName(kinesis.getStream());
500+
putRecord.setPartitionKey(kinesis.getPartition());
501+
putRecord.setData(ByteBuffer.wrap(bytes));
502+
503+
try {
504+
kinesis.getKinesis().putRecord(putRecord);
505+
} catch (Exception ex) {
506+
ex.printStackTrace();
507+
}
508+
}
509+
Thread.sleep(1); */
510+
} catch (Exception e) {
511+
e.printStackTrace();
512+
// return;
513+
}
514+
}
515+
}
516+
517+
public String serialize(Object msg) {
518+
if (msg instanceof String)
519+
return (String) msg;
520+
return Tools.serialize(mapper, msg);
521+
}
522+
444523

445524
/**
446525
* Run the Redis logger in a loop.

src/com/jacamars/dsp/rtb/common/Configuration.java

+58-9
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,32 @@ public static String substitute(String address) throws Exception {
884884

885885
if (address == null)
886886
return address;
887+
888+
///////////////////////////////////////////////////////////////////////////////////////////////////
889+
890+
while (address.contains("$BIDSCHANNEL"))
891+
address = GetEnvironmentVariable(address, "$BIDSCHANNEL", "kafka://[$BROKERLIST]&topic=bids");
892+
while (address.contains("$WINSCHANNEL"))
893+
address = GetEnvironmentVariable(address, "$WINSCHANNEL", "kafka://[$BROKERLIST]&topic=wins");
894+
while (address.contains("$REQUESTSCHANNEL"))
895+
address = GetEnvironmentVariable(address, "$REQUESTSCHANNEL", "kafka://[$BROKERLIST]&topic=requests");
896+
while (address.contains("$CLICKSCHANNEL"))
897+
address = GetEnvironmentVariable(address, "$CLICKSCHANNEL", "kafka://[$BROKERLIST]&topic=clicks");
898+
while (address.contains("$PIXELSCHANNEL"))
899+
address = GetEnvironmentVariable(address, "$PIXELSCHANNEL", "kafka://[$BROKERLIST]&topic=pixels");
900+
while (address.contains("$VIDEOEVENTSCHANNEL"))
901+
address = GetEnvironmentVariable(address, "$VIDEOEVENTSCHANNEL", "kafka://[$BROKERLIST]&topic=videoevents");
902+
while (address.contains("$POSTBACKEVENTSCHANNEL"))
903+
address = GetEnvironmentVariable(address, "$POSTBACKEVENTSCHANNEL", "kafka://[$BROKERLIST]&topic=postbackevents");
904+
while (address.contains("$STATUSCHANNEL"))
905+
address = GetEnvironmentVariable(address, "$STATUSCHANNEL", "kafka://[$BROKERLIST]&topic=status");
906+
while (address.contains("$REASONSCHANNEL"))
907+
address = GetEnvironmentVariable(address, "$REASONSCHANNEL", "kafka://[$BROKERLIST]&topic=reasons");
908+
while (address.contains("$LOGCHANNEL"))
909+
address = GetEnvironmentVariable(address, "$LOGCHANNEL", "kafka://[$BROKERLIST]&topic=reasons");
910+
kafka://[$BROKERLIST]&topic=logs
911+
912+
//////////////////////////////////////////////////////////////////////////////////////////////////////////
887913

888914
while (address.contains("$GEOPATCH"))
889915
address = GetEnvironmentVariable(address, "$GEOPATCH", "");
@@ -907,14 +933,37 @@ public static String substitute(String address) throws Exception {
907933
while (address.contains("$BIDSWITCH_ID"))
908934
address = GetEnvironmentVariable(address, "$BIDSWITCH_ID", "bidswitch-id");
909935

910-
while (address.contains("$S3REGION"))
911-
address = GetEnvironmentVariable(address, "$S3REGION", "");
912-
while (address.contains("$S3BUCKET"))
913-
address = GetEnvironmentVariable(address, "$S3BUCKET", "");
914-
while (address.contains("$S3ACCESSKEY"))
915-
address = GetEnvironmentVariable(address, "$S3ACCESSKEY", "");
916-
while (address.contains("$S3SECRETKEY"))
917-
address = GetEnvironmentVariable(address, "$S3SECRETKEY", "");
936+
/////////////////////////////////////////////////////////////////////////////
937+
938+
while(address.contains("$S3BUCKET"))
939+
address = GetEnvironmentVariable(address,"$S3BUCKET", "");
940+
941+
while(address.contains("$AWSACCESSKEY"))
942+
address = GetEnvironmentVariable(address,"$AWSACCESSKEY", "");
943+
944+
while(address.contains("$AWSSECRETKEY"))
945+
address = GetEnvironmentVariable(address,"$AWSSECRETKEY", "");
946+
947+
while(address.contains("$AWSREGION"))
948+
address = GetEnvironmentVariable(address,"$AWSREGION", Regions.US_EAST_1.getName());
949+
950+
while(address.contains("$AWSKINESIS_STREAM"))
951+
address = GetEnvironmentVariable(address,"$AWS_KINESIS_STREAM", "");
952+
953+
while(address.contains("$AWSKINESIS_PARTITION"))
954+
address = GetEnvironmentVariable(address,"$AWS_KINESESIS_PARTITION", "part-1");
955+
956+
while(address.contains("AWSKINESIS_SHARD"))
957+
address = GetEnvironmentVariable(address,"$AWS_KINESIS_SHARD", "shardId-000000000000");
958+
959+
while(address.contains("AWS_KINESIS_ITERATOR"))
960+
address = GetEnvironmentVariable(address,"$AWS_KENESIS_ITERATOR", "LATEST");
961+
962+
while(address.contains("AWSKINESIS_RECORDS"))
963+
address = GetEnvironmentVariable(address,"$AWS_KENSIS_RECORDS", "1");
964+
965+
/////////////////////////////////////////////////////////////////////////////
966+
918967

919968
while (address.contains("$FREQGOV"))
920969
address = GetEnvironmentVariable(address, "$FREQGOV", "true");
@@ -1006,7 +1055,7 @@ public static String GetEnvironmentVariable(String address, String varName) {
10061055
*/
10071056
public static String GetEnvironmentVariable(String address, String varName, String altName) {
10081057
String test = GetEnvironmentVariable(address, varName);
1009-
if (test == null) {
1058+
if (test == null && altName != null) {
10101059
test = address.replace(varName, altName);
10111060
}
10121061
return test;

src/com/jacamars/dsp/rtb/exchanges/adx/AdxBidRequest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import com.jacamars.dsp.rtb.pojo.Video;
4040

4141
interface Command {
42-
void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode root, Map db, String key);
42+
void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode root, Map db, String key) throws Exception;
4343
}
4444

4545
public class AdxBidRequest extends BidRequest {
@@ -54,7 +54,7 @@ public class AdxBidRequest extends BidRequest {
5454
static {
5555

5656
methodMap.put("device", new Command() {
57-
public void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode root, Map d, String key) {
57+
public void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode root, Map d, String key) throws Exception {
5858
String ip = AdxBidRequest.convertIp(x.getIp());
5959
String ua = x.getUserAgent();
6060

@@ -97,12 +97,15 @@ public void runCommand(BidRequest br, RealtimeBidding.BidRequest x, ObjectNode r
9797

9898
if (m.hasEncryptedAdvertisingId()) {
9999
ByteString bs = m.getEncryptedAdvertisingId();
100+
byte[] barry = bs.toByteArray();
100101
String id;
101102
try {
102-
id = AdxWinObject.decryptAdvertisingId(bs.toByteArray());
103+
id = AdxWinObject.decryptAdvertisingId(barry);
103104
device.put("ifa", id);
104105
} catch (Exception e) {
105-
e.printStackTrace();
106+
if (! RTBServer.spurious("AdxBidRequest.encryptedadid", 60)) {
107+
e.printStackTrace();
108+
}
106109
}
107110
}
108111

@@ -876,7 +879,7 @@ void handleFeedBack() {
876879
}
877880
}
878881

879-
void internalSetup() {
882+
void internalSetup() throws Exception {
880883

881884
List<String> beenThere = new ArrayList<String>();
882885
for (int i = 0; i < keys.size(); i++) {

src/com/jacamars/dsp/rtb/exchanges/adx/DoubleClick.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ public DoubleClick(InputStream in) throws Exception {
4848
* @throws Exception on stream reading errors
4949
*/
5050
@Override
51-
public DoubleClick copy(InputStream in) {
51+
public DoubleClick copy(InputStream in) throws Exception {
5252
try {
5353
return new DoubleClick(in);
5454
} catch (Exception error) {
5555
logger.debug("Error copying Google DoubleClick, something is not configured correctly: {}",error.getMessage());
56+
} catch (Error x) {
57+
logger.debug("Error copying Google DoubleClick, something is not configured correctly: {}",x.getMessage());
5658
}
5759
return null;
5860
}

0 commit comments

Comments
 (0)