diff --git a/abc.txt b/abc.txt
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/auth/auth-client/pom.xml b/auth/auth-client/pom.xml
index 14e80f2c35..a0ec2a4214 100644
--- a/auth/auth-client/pom.xml
+++ b/auth/auth-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/auth/auth-core/pom.xml b/auth/auth-core/pom.xml
index 257be185db..a2640a5b68 100644
--- a/auth/auth-core/pom.xml
+++ b/auth/auth-core/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/auth/auth-store/pom.xml b/auth/auth-store/pom.xml
index 3b9531494e..3914091486 100644
--- a/auth/auth-store/pom.xml
+++ b/auth/auth-store/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/auth/auth-util/pom.xml b/auth/auth-util/pom.xml
index fe30721176..90c958a022 100644
--- a/auth/auth-util/pom.xml
+++ b/auth/auth-util/pom.xml
@@ -3,7 +3,7 @@
emodb
com.bazaarvoice.emodb
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../pom.xml
4.0.0
diff --git a/blob-api/pom.xml b/blob-api/pom.xml
index 85123d5a42..e738315218 100644
--- a/blob-api/pom.xml
+++ b/blob-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/blob-clients/blob-client-common/pom.xml b/blob-clients/blob-client-common/pom.xml
index 2551dd9b1f..85638eb20d 100644
--- a/blob-clients/blob-client-common/pom.xml
+++ b/blob-clients/blob-client-common/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/blob-clients/blob-client-jersey2/pom.xml b/blob-clients/blob-client-jersey2/pom.xml
index 1469a20558..9dae41c49e 100644
--- a/blob-clients/blob-client-jersey2/pom.xml
+++ b/blob-clients/blob-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/blob-clients/blob-client/pom.xml b/blob-clients/blob-client/pom.xml
index 8eb86f2ce1..983e1141bb 100644
--- a/blob-clients/blob-client/pom.xml
+++ b/blob-clients/blob-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/blob/pom.xml b/blob/pom.xml
index e4396c74cf..8a09d259f4 100644
--- a/blob/pom.xml
+++ b/blob/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java
index 8eba374154..6ab72adaa1 100644
--- a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java
+++ b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java
@@ -474,4 +474,4 @@ private static void checkLegalBlobId(String blobId) {
"Blob IDs must be ASCII strings between 1 and 255 characters in length. " +
"Whitespace, ISO control characters and certain punctuation characters that aren't generally allowed in file names are excluded.");
}
-}
+}
\ No newline at end of file
diff --git a/cachemgr/pom.xml b/cachemgr/pom.xml
index c05b60ea79..a556af875a 100644
--- a/cachemgr/pom.xml
+++ b/cachemgr/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/common/api/pom.xml b/common/api/pom.xml
index b8892b6428..9e3eb49331 100644
--- a/common/api/pom.xml
+++ b/common/api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/astyanax/pom.xml b/common/astyanax/pom.xml
index b6eff20acf..fb947bedb4 100644
--- a/common/astyanax/pom.xml
+++ b/common/astyanax/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/client-jax-rs-2/pom.xml b/common/client-jax-rs-2/pom.xml
index 1f2e6db668..6bbacebeb6 100644
--- a/common/client-jax-rs-2/pom.xml
+++ b/common/client-jax-rs-2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/client-jersey2/pom.xml b/common/client-jersey2/pom.xml
index 1a6fe97c68..9804488145 100644
--- a/common/client-jersey2/pom.xml
+++ b/common/client-jersey2/pom.xml
@@ -5,7 +5,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/client/pom.xml b/common/client/pom.xml
index 640eeb51a0..bacc4c21d9 100644
--- a/common/client/pom.xml
+++ b/common/client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/dropwizard/pom.xml b/common/dropwizard/pom.xml
index 8bb610aba2..d799c4406c 100644
--- a/common/dropwizard/pom.xml
+++ b/common/dropwizard/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/jersey-client/pom.xml b/common/jersey-client/pom.xml
index aa9d419736..63c76a2461 100644
--- a/common/jersey-client/pom.xml
+++ b/common/jersey-client/pom.xml
@@ -5,7 +5,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/json/pom.xml b/common/json/pom.xml
index 917adb1936..9b280a591f 100644
--- a/common/json/pom.xml
+++ b/common/json/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/stash/pom.xml b/common/stash/pom.xml
index bb9f0e5b7b..252172f0e1 100644
--- a/common/stash/pom.xml
+++ b/common/stash/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/uuid/pom.xml b/common/uuid/pom.xml
index 69c7151265..bc778252ac 100644
--- a/common/uuid/pom.xml
+++ b/common/uuid/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/common/zookeeper/pom.xml b/common/zookeeper/pom.xml
index 02e713b07c..ada74ff979 100644
--- a/common/zookeeper/pom.xml
+++ b/common/zookeeper/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/databus-api/pom.xml b/databus-api/pom.xml
index 27ef872a85..af2eaa8064 100644
--- a/databus-api/pom.xml
+++ b/databus-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/databus-client-common/pom.xml b/databus-client-common/pom.xml
index 5af9b9d379..3a3f4bcb7a 100644
--- a/databus-client-common/pom.xml
+++ b/databus-client-common/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/databus-client-jersey2/pom.xml b/databus-client-jersey2/pom.xml
index 7c2794e645..56c4926fd1 100644
--- a/databus-client-jersey2/pom.xml
+++ b/databus-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/databus-client/pom.xml b/databus-client/pom.xml
index 46b1d8c25d..725086ecff 100644
--- a/databus-client/pom.xml
+++ b/databus-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/databus/pom.xml b/databus/pom.xml
index e966f653bb..464a645e31 100644
--- a/databus/pom.xml
+++ b/databus/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/datacenter/pom.xml b/datacenter/pom.xml
index d0582217b8..ff48171432 100644
--- a/datacenter/pom.xml
+++ b/datacenter/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/event/pom.xml b/event/pom.xml
index 8e3cd03e8b..b43ec8565c 100644
--- a/event/pom.xml
+++ b/event/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/job-api/pom.xml b/job-api/pom.xml
index 8ac8b8687e..2f2e764391 100644
--- a/job-api/pom.xml
+++ b/job-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/job/pom.xml b/job/pom.xml
index e4a44a41d2..a79351ca1f 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/kafka/pom.xml b/kafka/pom.xml
index b553098418..43e1a3a6c3 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/megabus/pom.xml b/megabus/pom.xml
index 2c91fd1561..c1e6f3a0b5 100644
--- a/megabus/pom.xml
+++ b/megabus/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/parent/pom.xml b/parent/pom.xml
index 27d58c9133..6784698424 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -11,7 +11,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
pom
EmoDB Parent
@@ -635,11 +635,22 @@
aws-java-sdk-sns
${aws-sdk.version}
+
+ com.amazonaws
+ aws-java-sdk-stepfunctions
+ ${aws-sdk.version}
+
com.amazonaws
aws-java-sdk-sqs
${aws-sdk.version}
+
+
+ com.amazonaws
+ aws-java-sdk-ssm
+ ${aws-sdk.version}
+
com.amazonaws
aws-java-sdk-sts
diff --git a/plugins/pom.xml b/plugins/pom.xml
index e67d3d650d..1cefbd9e21 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -4,7 +4,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/pom.xml b/pom.xml
index 5f048e3d7b..aba38f7f68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
parent/pom.xml
diff --git a/quality/integration/pom.xml b/quality/integration/pom.xml
index f2fca66aa4..9235aea485 100644
--- a/quality/integration/pom.xml
+++ b/quality/integration/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../../parent/pom.xml
diff --git a/quality/pom.xml b/quality/pom.xml
index ffbd280391..5024ecba8f 100644
--- a/quality/pom.xml
+++ b/quality/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/queue-api/pom.xml b/queue-api/pom.xml
index b7b7eb2b33..a081b19e3f 100644
--- a/queue-api/pom.xml
+++ b/queue-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java
index 41991d5b95..3b1fd2ffde 100644
--- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java
+++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java
@@ -64,4 +64,6 @@ public interface AuthDedupQueueService {
/** Delete all messages in the queue, for debugging/testing. */
void purge(@Credential String apiKey, String queue);
+
+ void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush);
}
diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java
index a077eb2062..1bae1893f1 100644
--- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java
+++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java
@@ -18,6 +18,7 @@ public interface AuthQueueService {
void sendAll(@Credential String apiKey, String queue, Collection> messages);
void sendAll(@Credential String apiKey, Map> messagesByQueue);
+ void sendAll(@Credential String apiKey, String queue, Collection> messages, boolean isFlush);
/**
* Counts pending messages for the specified queue. The count will include messages that are currently claimed
@@ -64,4 +65,6 @@ public interface AuthQueueService {
/** Delete all messages in the queue, for debugging/testing. */
void purge(@Credential String apiKey, String queue);
+
+
}
diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java
index 3fcd38b5a4..4b0af997f9 100644
--- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java
+++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java
@@ -1,5 +1,6 @@
package com.bazaarvoice.emodb.queue.api;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@@ -15,6 +16,8 @@ public interface BaseQueueService {
void sendAll(Map> messagesByQueue);
+ void sendAll(String queue, Collection> messages, boolean isFlush);
+
/**
* Counts pending messages for the specified queue. The count will include messages that are currently claimed
* and not returned by the {@link #poll} method.
diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java
index 12ab97a45a..a6dc77515b 100644
--- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java
+++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java
@@ -15,6 +15,8 @@ public interface DedupQueueService extends BaseQueueService {
void sendAll(Map> messagesByQueue);
+ void sendAll(String queue, Collection>messages, boolean isFlush);
+
/**
* Counts pending messages for the specified queue. The count will include messages that are currently claimed
* and not returned by the {@link #poll} method.
diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java
index c87740330c..4d533c755e 100644
--- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java
+++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java
@@ -13,8 +13,12 @@ public interface QueueService extends BaseQueueService {
void sendAll(String queue, Collection> messages);
+
void sendAll(Map> messagesByQueue);
+ //Overloaded sendAll method to send to cassandra
+ void sendAll(String queue, Collection> messages, boolean isFlush);
+
/**
* Counts pending messages for the specified queue. The count will include messages that are currently claimed
* and not returned by the {@link #poll} method.
diff --git a/queue-client-common/pom.xml b/queue-client-common/pom.xml
index d71aae5218..12a93a7f62 100644
--- a/queue-client-common/pom.xml
+++ b/queue-client-common/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
index b8aaa83667..7e01cd8b91 100644
--- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
+++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
@@ -75,6 +75,25 @@ public void sendAll(String apiKey, String queue, Collection> messages) {
}
}
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+ requireNonNull(queue, "queue");
+ requireNonNull(messages, "messages");
+ if (messages.isEmpty()) {
+ return;
+ }
+ try {
+ URI uri = _queueService.clone()
+ .segment(queue, "sendbatch")
+ .build();
+ _client.resource(uri)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey)
+ .post(messages);
+ } catch (EmoClientException e) {
+ throw convertException(e);
+ }
+ }
+
// Any server can handle sending messages, no need for @PartitionKey
public void sendAll(String apiKey, Map> messagesByQueue) {
requireNonNull(messagesByQueue, "messagesByQueue");
diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
index 49d4c240f2..01fa7830eb 100644
--- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
+++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
@@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) {
_authDedupQueueService.sendAll(_apiKey, messagesByQueue);
}
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean isFlush) {
+ _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush);
+ }
+
@Override
public MoveQueueStatus getMoveStatus(String reference) {
return _authDedupQueueService.getMoveStatus(_apiKey, reference);
diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java
index 683af2162d..92769441dd 100644
--- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java
+++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java
@@ -32,6 +32,7 @@ public QueueClient(URI endPoint, boolean partitionSafe, EmoClient client) {
super(endPoint, partitionSafe, client);
}
+
@Override
public long getMessageCount(String apiKey, String queue) {
// Any server can handle this request, no need for @PartitionKey
diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
index 29f8fd4ae6..714897a36e 100644
--- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
+++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
@@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) {
_authQueueService.sendAll(_apiKey, messagesByQueue);
}
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean isFlush) {
+ _authQueueService.sendAll(_apiKey, queue, messages, isFlush);
+ }
+
@Override
public MoveQueueStatus getMoveStatus(String reference) {
return _authQueueService.getMoveStatus(_apiKey, reference);
diff --git a/queue-client-jersey2/pom.xml b/queue-client-jersey2/pom.xml
index 2eb7d7ceef..ccf5ec3441 100644
--- a/queue-client-jersey2/pom.xml
+++ b/queue-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
index bd4859d58f..7e315932b7 100644
--- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
+++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java
@@ -67,6 +67,23 @@ public void sendAll(String apiKey, String queue, Collection> messages) {
.post(messages));
}
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+ requireNonNull(queue, "queue");
+ requireNonNull(messages, "messages");
+ if (messages.isEmpty()) {
+ return;
+ }
+ URI uri = _queueService.clone()
+ .segment(queue, "sendbatch")
+ .build();
+
+ Failsafe.with(_retryPolicy)
+ .run(() -> _client.resource(uri)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey)
+ .post(messages));
+ }
+
public void sendAll(String apiKey, Map> messagesByQueue) {
requireNonNull(messagesByQueue, "messagesByQueue");
if (messagesByQueue.isEmpty()) {
diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
index f37405182b..19df050f64 100644
--- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
+++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java
@@ -36,6 +36,11 @@ public void sendAll(Map> messagesByQueue) {
_authDedupQueueService.sendAll(_apiKey, messagesByQueue);
}
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean isFlush) {
+ _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush);
+ }
+
@Override
public MoveQueueStatus getMoveStatus(String reference) {
return _authDedupQueueService.getMoveStatus(_apiKey, reference);
diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
index 144b991f32..fef04a42e1 100644
--- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
+++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java
@@ -35,6 +35,11 @@ public void sendAll(Map> messagesByQueue) {
_authQueueService.sendAll(_apiKey, messagesByQueue);
}
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean isFlush) {
+ _authQueueService.sendAll(_apiKey, queue, messages, isFlush);
+ }
+
@Override
public MoveQueueStatus getMoveStatus(String reference) {
return _authQueueService.getMoveStatus(_apiKey, reference);
diff --git a/queue-client/pom.xml b/queue-client/pom.xml
index 53a53dc625..564a3eb5be 100644
--- a/queue-client/pom.xml
+++ b/queue-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
diff --git a/queue/pom.xml b/queue/pom.xml
index c38a0b5332..3fd13f9124 100644
--- a/queue/pom.xml
+++ b/queue/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.185-SNAPSHOT
../parent/pom.xml
@@ -75,20 +75,24 @@
com.fasterxml.jackson.core
jackson-annotations
+
+ com.fasterxml.jackson.core
+ jackson-core
+
com.fasterxml.jackson.core
jackson-databind
${jackson.databind.version}
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.core
- jackson-annotations
-
-
+
+
+
+
+
+
+
+
+
+
javax.validation
@@ -98,6 +102,11 @@
org.apache.curator
curator-framework
+
+ org.slf4j
+ slf4j-api
+
+
@@ -110,5 +119,21 @@
testng
test
+
+ org.apache.kafka
+ kafka-clients
+
+
+ com.amazonaws
+ aws-java-sdk-core
+
+
+ com.amazonaws
+ aws-java-sdk-stepfunctions
+
+
+ com.amazonaws
+ aws-java-sdk-ssm
+
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java
index 20ff816a5c..b42d33b02c 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java
@@ -21,6 +21,9 @@
import com.bazaarvoice.emodb.queue.core.DefaultDedupQueueService;
import com.bazaarvoice.emodb.queue.core.DefaultQueueService;
import com.bazaarvoice.emodb.queue.core.QueueChannelConfiguration;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.bazaarvoice.ostrich.HostDiscovery;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Supplier;
@@ -82,6 +85,14 @@ protected void configure() {
bind(new TypeLiteral>() {}).annotatedWith(DedupEnabled.class).toInstance(Suppliers.ofInstance(true));
install(new EventStoreModule("bv.emodb.queue", _metricRegistry));
+ // Bind Kafka services
+ bind (KafkaAdminService.class).asEagerSingleton();
+ bind(KafkaProducerService.class).asEagerSingleton();
+
+ // Bind Step Function Service
+ bind(StepFunctionService.class).asEagerSingleton();
+
+
// Bind the Queue instance that the rest of the application will consume
bind(QueueService.class).to(DefaultQueueService.class).asEagerSingleton();
expose(QueueService.class);
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
index f07083616d..7a1f511fd8 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
@@ -17,9 +17,13 @@
import com.bazaarvoice.emodb.queue.api.MoveQueueStatus;
import com.bazaarvoice.emodb.queue.api.Names;
import com.bazaarvoice.emodb.queue.api.UnknownMoveException;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
@@ -33,39 +37,45 @@
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
abstract class AbstractQueueService implements BaseQueueService {
+ private final Logger _log = LoggerFactory.getLogger(AbstractQueueService.class);
private final BaseEventStore _eventStore;
private final JobService _jobService;
private final JobType _moveQueueJobType;
private final LoadingCache> _queueSizeCache;
- private final Meter _sendAllMeterAQS;
- private final Meter _sendAllMeterNullAQS;
-
- private final Meter _pollAQS;
- private final Meter _pollNullAQS;
+ private final KafkaAdminService adminService;
+ private final KafkaProducerService producerService;
+ // Configuration keys for Kafka topic settings
+ private static final Integer TOPIC_PARTITION_COUNT = 1;
+ private static final Short TOPIC_REPLICATION_FACTOR =2;
public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024;
+ private final StepFunctionService stepFunctionService;
+ private final ParameterStoreUtil parameterStoreUtil;
protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
JobHandlerRegistry jobHandlerRegistry,
JobType moveQueueJobType,
- Clock clock, MetricRegistry metricRegistry) {
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
_eventStore = eventStore;
_jobService = jobService;
_moveQueueJobType = moveQueueJobType;
+ this.adminService = adminService;
+ this.producerService = producerService;
+ this.stepFunctionService = stepFunctionService;
+ this.parameterStoreUtil = new ParameterStoreUtil();
- registerMoveQueueJobHandler(jobHandlerRegistry);
+ registerMoveQueueJobHandler(jobHandlerRegistry);
_queueSizeCache = CacheBuilder.newBuilder()
.expireAfterWrite(15, TimeUnit.SECONDS)
.maximumSize(2000)
@@ -77,11 +87,6 @@ public Map.Entry load(SizeCacheKey key)
return Maps.immutableEntry(internalMessageCountUpTo(key.channelName, key.limitAsked), key.limitAsked);
}
});
- _sendAllMeterAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllAQS"));
- _sendAllMeterNullAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllNullAQS"));
- _pollAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollAQS"));
- _pollNullAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollNullAQS"));
-
}
private void registerMoveQueueJobHandler(JobHandlerRegistry jobHandlerRegistry) {
@@ -111,40 +116,124 @@ public MoveQueueResult run(MoveQueueRequest request)
@Override
public void send(String queue, Object message) {
- sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ List allowedQueues = fetchAllowedQueues();
+ boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/emodb/experiment/isExperiment"));
+ if (!isExperiment) {
+ // experiment is over now, send everything to kafka
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ } else {
+ // Experiment is still running, check if the queue is allowed
+ if(allowedQueues.contains(queue)){
+ //send kafka , only if its allowed queue
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ }
+ else {
+ //send to cassandra, (rollback plan)
+ sendAll(queue, Collections.singleton(message), false);
+ }
+ }
}
@Override
public void sendAll(String queue, Collection> messages) {
- sendAll(Collections.singletonMap(queue, messages));
+ List allowedQueues = fetchAllowedQueues();
+ boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/emodb/experiment/isExperiment"));
+ if (!isExperiment) {
+ // experiment is over now, send everything to kafka
+ sendAll(Collections.singletonMap(queue, messages));
+ } else {
+ // Experiment is still running, check if the queue is allowed
+ if(allowedQueues.contains(queue)){
+ //send kafka , only if its allowed queue
+ sendAll(Collections.singletonMap(queue, messages));
+ }
+ else {
+ //send to cassandra, (rollback plan)
+ sendAll(queue, messages, false);
+ }
+ }
+ }
+
+
+ private void validateMessage(Object message) {
+ _log.debug("Validating message: {}", message);
+
+ // Check if the message is valid using JsonValidator
+ ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
+
+ // Check if the message size exceeds the allowed limit
+ checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES,
+ "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+
+ _log.debug("Message size is valid. Size: {}", messageByteBuffer.limit());
+ }
+
+ private void validateQueue(String queue, Collection> messages) {
+ requireNonNull(queue, "Queue name cannot be null");
+ requireNonNull(messages, "Messages collection cannot be null");
+
+ // Check if the queue name is legal
+ checkLegalQueueName(queue);
+
+ _log.debug("Queue name '{}' is valid and contains {} messages", queue, messages.size());
}
@Override
public void sendAll(Map> messagesByQueue) {
requireNonNull(messagesByQueue, "messagesByQueue");
- if(messagesByQueue.keySet().isEmpty()){
- _sendAllMeterNullAQS.mark();
- } else {
- _sendAllMeterAQS.mark(messagesByQueue.keySet().size());
- }
- ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
for (Map.Entry> entry : messagesByQueue.entrySet()) {
String queue = entry.getKey();
Collection> messages = entry.getValue();
- checkLegalQueueName(queue);
- requireNonNull(messages, "messages");
+ validateQueue(queue, messages);
- List events = Lists.newArrayListWithCapacity(messages.size());
+ List