diff --git a/abc.txt b/abc.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/auth/auth-client/pom.xml b/auth/auth-client/pom.xml index 14e80f2c35..a0ec2a4214 100644 --- a/auth/auth-client/pom.xml +++ b/auth/auth-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-core/pom.xml b/auth/auth-core/pom.xml index 257be185db..a2640a5b68 100644 --- a/auth/auth-core/pom.xml +++ b/auth/auth-core/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-store/pom.xml b/auth/auth-store/pom.xml index 3b9531494e..3914091486 100644 --- a/auth/auth-store/pom.xml +++ b/auth/auth-store/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-util/pom.xml b/auth/auth-util/pom.xml index fe30721176..90c958a022 100644 --- a/auth/auth-util/pom.xml +++ b/auth/auth-util/pom.xml @@ -3,7 +3,7 @@ emodb com.bazaarvoice.emodb - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/blob-api/pom.xml b/blob-api/pom.xml index 85123d5a42..e738315218 100644 --- a/blob-api/pom.xml +++ b/blob-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/blob-clients/blob-client-common/pom.xml b/blob-clients/blob-client-common/pom.xml index 2551dd9b1f..85638eb20d 100644 --- a/blob-clients/blob-client-common/pom.xml +++ b/blob-clients/blob-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client-jersey2/pom.xml b/blob-clients/blob-client-jersey2/pom.xml index 1469a20558..9dae41c49e 100644 --- a/blob-clients/blob-client-jersey2/pom.xml +++ b/blob-clients/blob-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client/pom.xml b/blob-clients/blob-client/pom.xml index 8eb86f2ce1..983e1141bb 100644 --- a/blob-clients/blob-client/pom.xml +++ b/blob-clients/blob-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/blob/pom.xml b/blob/pom.xml index e4396c74cf..8a09d259f4 100644 --- a/blob/pom.xml +++ b/blob/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java index 8eba374154..6ab72adaa1 100644 --- a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java +++ b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java @@ -474,4 +474,4 @@ private static void checkLegalBlobId(String blobId) { "Blob IDs must be ASCII strings between 1 and 255 characters in length. " + "Whitespace, ISO control characters and certain punctuation characters that aren't generally allowed in file names are excluded."); } -} +} \ No newline at end of file diff --git a/cachemgr/pom.xml b/cachemgr/pom.xml index c05b60ea79..a556af875a 100644 --- a/cachemgr/pom.xml +++ b/cachemgr/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/common/api/pom.xml b/common/api/pom.xml index b8892b6428..9e3eb49331 100644 --- a/common/api/pom.xml +++ b/common/api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/astyanax/pom.xml b/common/astyanax/pom.xml index b6eff20acf..fb947bedb4 100644 --- a/common/astyanax/pom.xml +++ b/common/astyanax/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jax-rs-2/pom.xml b/common/client-jax-rs-2/pom.xml index 1f2e6db668..6bbacebeb6 100644 --- a/common/client-jax-rs-2/pom.xml +++ b/common/client-jax-rs-2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jersey2/pom.xml b/common/client-jersey2/pom.xml index 1a6fe97c68..9804488145 100644 --- a/common/client-jersey2/pom.xml +++ b/common/client-jersey2/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/client/pom.xml b/common/client/pom.xml index 640eeb51a0..bacc4c21d9 100644 --- a/common/client/pom.xml +++ b/common/client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/dropwizard/pom.xml b/common/dropwizard/pom.xml index 8bb610aba2..d799c4406c 100644 --- a/common/dropwizard/pom.xml +++ b/common/dropwizard/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/jersey-client/pom.xml b/common/jersey-client/pom.xml index aa9d419736..63c76a2461 100644 --- a/common/jersey-client/pom.xml +++ b/common/jersey-client/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/json/pom.xml b/common/json/pom.xml index 917adb1936..9b280a591f 100644 --- a/common/json/pom.xml +++ b/common/json/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/stash/pom.xml b/common/stash/pom.xml index bb9f0e5b7b..252172f0e1 100644 --- a/common/stash/pom.xml +++ b/common/stash/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/uuid/pom.xml b/common/uuid/pom.xml index 69c7151265..bc778252ac 100644 --- a/common/uuid/pom.xml +++ b/common/uuid/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/common/zookeeper/pom.xml b/common/zookeeper/pom.xml index 02e713b07c..ada74ff979 100644 --- a/common/zookeeper/pom.xml +++ b/common/zookeeper/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/databus-api/pom.xml b/databus-api/pom.xml index 27ef872a85..af2eaa8064 100644 --- a/databus-api/pom.xml +++ b/databus-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-common/pom.xml b/databus-client-common/pom.xml index 5af9b9d379..3a3f4bcb7a 100644 --- a/databus-client-common/pom.xml +++ b/databus-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-jersey2/pom.xml b/databus-client-jersey2/pom.xml index 7c2794e645..56c4926fd1 100644 --- a/databus-client-jersey2/pom.xml +++ b/databus-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/databus-client/pom.xml b/databus-client/pom.xml index 46b1d8c25d..725086ecff 100644 --- a/databus-client/pom.xml +++ b/databus-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/databus/pom.xml b/databus/pom.xml index e966f653bb..464a645e31 100644 --- a/databus/pom.xml +++ b/databus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/datacenter/pom.xml b/datacenter/pom.xml index d0582217b8..ff48171432 100644 --- a/datacenter/pom.xml +++ b/datacenter/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/event/pom.xml b/event/pom.xml index 8e3cd03e8b..b43ec8565c 100644 --- a/event/pom.xml +++ b/event/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/job-api/pom.xml b/job-api/pom.xml index 8ac8b8687e..2f2e764391 100644 --- a/job-api/pom.xml +++ b/job-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/job/pom.xml b/job/pom.xml index e4a44a41d2..a79351ca1f 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/kafka/pom.xml b/kafka/pom.xml index b553098418..43e1a3a6c3 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/megabus/pom.xml b/megabus/pom.xml index 2c91fd1561..c1e6f3a0b5 100644 --- a/megabus/pom.xml +++ b/megabus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/parent/pom.xml b/parent/pom.xml index 27d58c9133..6784698424 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -11,7 +11,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT pom EmoDB Parent @@ -635,11 +635,22 @@ aws-java-sdk-sns ${aws-sdk.version} + + com.amazonaws + aws-java-sdk-stepfunctions + ${aws-sdk.version} + com.amazonaws aws-java-sdk-sqs ${aws-sdk.version} + + + com.amazonaws + aws-java-sdk-ssm + ${aws-sdk.version} + com.amazonaws aws-java-sdk-sts diff --git a/plugins/pom.xml b/plugins/pom.xml index e67d3d650d..1cefbd9e21 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/pom.xml b/pom.xml index 5f048e3d7b..aba38f7f68 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT parent/pom.xml diff --git a/quality/integration/pom.xml b/quality/integration/pom.xml index f2fca66aa4..9235aea485 100644 --- a/quality/integration/pom.xml +++ b/quality/integration/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../../parent/pom.xml diff --git a/quality/pom.xml b/quality/pom.xml index ffbd280391..5024ecba8f 100644 --- a/quality/pom.xml +++ b/quality/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/pom.xml b/queue-api/pom.xml index b7b7eb2b33..a081b19e3f 100644 --- a/queue-api/pom.xml +++ b/queue-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java index 41991d5b95..3b1fd2ffde 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java @@ -64,4 +64,6 @@ public interface AuthDedupQueueService { /** Delete all messages in the queue, for debugging/testing. */ void purge(@Credential String apiKey, String queue); + + void sendAll(String apiKey, String queue, Collection messages, boolean isFlush); } diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java index a077eb2062..1bae1893f1 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java @@ -18,6 +18,7 @@ public interface AuthQueueService { void sendAll(@Credential String apiKey, String queue, Collection messages); void sendAll(@Credential String apiKey, Map> messagesByQueue); + void sendAll(@Credential String apiKey, String queue, Collection messages, boolean isFlush); /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed @@ -64,4 +65,6 @@ public interface AuthQueueService { /** Delete all messages in the queue, for debugging/testing. */ void purge(@Credential String apiKey, String queue); + + } diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java index 3fcd38b5a4..4b0af997f9 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java @@ -1,5 +1,6 @@ package com.bazaarvoice.emodb.queue.api; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -15,6 +16,8 @@ public interface BaseQueueService { void sendAll(Map> messagesByQueue); + void sendAll(String queue, Collection messages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java index 12ab97a45a..a6dc77515b 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java @@ -15,6 +15,8 @@ public interface DedupQueueService extends BaseQueueService { void sendAll(Map> messagesByQueue); + void sendAll(String queue, Collectionmessages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java index c87740330c..4d533c755e 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java @@ -13,8 +13,12 @@ public interface QueueService extends BaseQueueService { void sendAll(String queue, Collection messages); + void sendAll(Map> messagesByQueue); + //Overloaded sendAll method to send to cassandra + void sendAll(String queue, Collection messages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. diff --git a/queue-client-common/pom.xml b/queue-client-common/pom.xml index d71aae5218..12a93a7f62 100644 --- a/queue-client-common/pom.xml +++ b/queue-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java index b8aaa83667..7e01cd8b91 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java @@ -75,6 +75,25 @@ public void sendAll(String apiKey, String queue, Collection messages) { } } + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + requireNonNull(queue, "queue"); + requireNonNull(messages, "messages"); + if (messages.isEmpty()) { + return; + } + try { + URI uri = _queueService.clone() + .segment(queue, "sendbatch") + .build(); + _client.resource(uri) + .type(MediaType.APPLICATION_JSON_TYPE) + .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey) + .post(messages); + } catch (EmoClientException e) { + throw convertException(e); + } + } + // Any server can handle sending messages, no need for @PartitionKey public void sendAll(String apiKey, Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java index 49d4c240f2..01fa7830eb 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java @@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) { _authDedupQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authDedupQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java index 683af2162d..92769441dd 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java @@ -32,6 +32,7 @@ public QueueClient(URI endPoint, boolean partitionSafe, EmoClient client) { super(endPoint, partitionSafe, client); } + @Override public long getMessageCount(String apiKey, String queue) { // Any server can handle this request, no need for @PartitionKey diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java index 29f8fd4ae6..714897a36e 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java @@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) { _authQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-jersey2/pom.xml b/queue-client-jersey2/pom.xml index 2eb7d7ceef..ccf5ec3441 100644 --- a/queue-client-jersey2/pom.xml +++ b/queue-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java index bd4859d58f..7e315932b7 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java @@ -67,6 +67,23 @@ public void sendAll(String apiKey, String queue, Collection messages) { .post(messages)); } + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + requireNonNull(queue, "queue"); + requireNonNull(messages, "messages"); + if (messages.isEmpty()) { + return; + } + URI uri = _queueService.clone() + .segment(queue, "sendbatch") + .build(); + + Failsafe.with(_retryPolicy) + .run(() -> _client.resource(uri) + .type(MediaType.APPLICATION_JSON_TYPE) + .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey) + .post(messages)); + } + public void sendAll(String apiKey, Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); if (messagesByQueue.isEmpty()) { diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java index f37405182b..19df050f64 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java @@ -36,6 +36,11 @@ public void sendAll(Map> messagesByQueue) { _authDedupQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authDedupQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java index 144b991f32..fef04a42e1 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java @@ -35,6 +35,11 @@ public void sendAll(Map> messagesByQueue) { _authQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client/pom.xml b/queue-client/pom.xml index 53a53dc625..564a3eb5be 100644 --- a/queue-client/pom.xml +++ b/queue-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/queue/pom.xml b/queue/pom.xml index c38a0b5332..3fd13f9124 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml @@ -75,20 +75,24 @@ com.fasterxml.jackson.core jackson-annotations + + com.fasterxml.jackson.core + jackson-core + com.fasterxml.jackson.core jackson-databind ${jackson.databind.version} - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-annotations - - + + + + + + + + + + javax.validation @@ -98,6 +102,11 @@ org.apache.curator curator-framework + + org.slf4j + slf4j-api + + @@ -110,5 +119,21 @@ testng test + + org.apache.kafka + kafka-clients + + + com.amazonaws + aws-java-sdk-core + + + com.amazonaws + aws-java-sdk-stepfunctions + + + com.amazonaws + aws-java-sdk-ssm + diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java index 20ff816a5c..b42d33b02c 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java @@ -21,6 +21,9 @@ import com.bazaarvoice.emodb.queue.core.DefaultDedupQueueService; import com.bazaarvoice.emodb.queue.core.DefaultQueueService; import com.bazaarvoice.emodb.queue.core.QueueChannelConfiguration; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.ostrich.HostDiscovery; import com.codahale.metrics.MetricRegistry; import com.google.common.base.Supplier; @@ -82,6 +85,14 @@ protected void configure() { bind(new TypeLiteral>() {}).annotatedWith(DedupEnabled.class).toInstance(Suppliers.ofInstance(true)); install(new EventStoreModule("bv.emodb.queue", _metricRegistry)); + // Bind Kafka services + bind (KafkaAdminService.class).asEagerSingleton(); + bind(KafkaProducerService.class).asEagerSingleton(); + + // Bind Step Function Service + bind(StepFunctionService.class).asEagerSingleton(); + + // Bind the Queue instance that the rest of the application will consume bind(QueueService.class).to(DefaultQueueService.class).asEagerSingleton(); expose(QueueService.class); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index f07083616d..7a1f511fd8 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -17,9 +17,13 @@ import com.bazaarvoice.emodb.queue.api.MoveQueueStatus; import com.bazaarvoice.emodb.queue.api.Names; import com.bazaarvoice.emodb.queue.api.UnknownMoveException; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; @@ -33,39 +37,45 @@ import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + abstract class AbstractQueueService implements BaseQueueService { + private final Logger _log = LoggerFactory.getLogger(AbstractQueueService.class); private final BaseEventStore _eventStore; private final JobService _jobService; private final JobType _moveQueueJobType; private final LoadingCache> _queueSizeCache; - private final Meter _sendAllMeterAQS; - private final Meter _sendAllMeterNullAQS; - - private final Meter _pollAQS; - private final Meter _pollNullAQS; + private final KafkaAdminService adminService; + private final KafkaProducerService producerService; + // Configuration keys for Kafka topic settings + private static final Integer TOPIC_PARTITION_COUNT = 1; + private static final Short TOPIC_REPLICATION_FACTOR =2; public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024; + private final StepFunctionService stepFunctionService; + private final ParameterStoreUtil parameterStoreUtil; protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, JobType moveQueueJobType, - Clock clock, MetricRegistry metricRegistry) { + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { _eventStore = eventStore; _jobService = jobService; _moveQueueJobType = moveQueueJobType; + this.adminService = adminService; + this.producerService = producerService; + this.stepFunctionService = stepFunctionService; + this.parameterStoreUtil = new ParameterStoreUtil(); - registerMoveQueueJobHandler(jobHandlerRegistry); + registerMoveQueueJobHandler(jobHandlerRegistry); _queueSizeCache = CacheBuilder.newBuilder() .expireAfterWrite(15, TimeUnit.SECONDS) .maximumSize(2000) @@ -77,11 +87,6 @@ public Map.Entry load(SizeCacheKey key) return Maps.immutableEntry(internalMessageCountUpTo(key.channelName, key.limitAsked), key.limitAsked); } }); - _sendAllMeterAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllAQS")); - _sendAllMeterNullAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllNullAQS")); - _pollAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollAQS")); - _pollNullAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollNullAQS")); - } private void registerMoveQueueJobHandler(JobHandlerRegistry jobHandlerRegistry) { @@ -111,40 +116,124 @@ public MoveQueueResult run(MoveQueueRequest request) @Override public void send(String queue, Object message) { - sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + List allowedQueues = fetchAllowedQueues(); + boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/emodb/experiment/isExperiment")); + if (!isExperiment) { + // experiment is over now, send everything to kafka + sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + } else { + // Experiment is still running, check if the queue is allowed + if(allowedQueues.contains(queue)){ + //send kafka , only if its allowed queue + sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + } + else { + //send to cassandra, (rollback plan) + sendAll(queue, Collections.singleton(message), false); + } + } } @Override public void sendAll(String queue, Collection messages) { - sendAll(Collections.singletonMap(queue, messages)); + List allowedQueues = fetchAllowedQueues(); + boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/emodb/experiment/isExperiment")); + if (!isExperiment) { + // experiment is over now, send everything to kafka + sendAll(Collections.singletonMap(queue, messages)); + } else { + // Experiment is still running, check if the queue is allowed + if(allowedQueues.contains(queue)){ + //send kafka , only if its allowed queue + sendAll(Collections.singletonMap(queue, messages)); + } + else { + //send to cassandra, (rollback plan) + sendAll(queue, messages, false); + } + } + } + + + private void validateMessage(Object message) { + _log.debug("Validating message: {}", message); + + // Check if the message is valid using JsonValidator + ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); + + // Check if the message size exceeds the allowed limit + checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, + "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + + _log.debug("Message size is valid. Size: {}", messageByteBuffer.limit()); + } + + private void validateQueue(String queue, Collection messages) { + requireNonNull(queue, "Queue name cannot be null"); + requireNonNull(messages, "Messages collection cannot be null"); + + // Check if the queue name is legal + checkLegalQueueName(queue); + + _log.debug("Queue name '{}' is valid and contains {} messages", queue, messages.size()); } @Override public void sendAll(Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); - if(messagesByQueue.keySet().isEmpty()){ - _sendAllMeterNullAQS.mark(); - } else { - _sendAllMeterAQS.mark(messagesByQueue.keySet().size()); - } - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); for (Map.Entry> entry : messagesByQueue.entrySet()) { String queue = entry.getKey(); Collection messages = entry.getValue(); - checkLegalQueueName(queue); - requireNonNull(messages, "messages"); + validateQueue(queue, messages); - List events = Lists.newArrayListWithCapacity(messages.size()); + List events = Lists.newArrayListWithCapacity(messages.size()); + + // Validate each message for (Object message : messages) { - ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); - checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + validateMessage(message); + events.add(message); + } + builder.putAll(queue, String.valueOf(events)); + } - events.add(messageByteBuffer); + Multimap eventsByChannel = builder.build(); + + String queueType = determineQueueType(); + for (Map.Entry> topicEntry : eventsByChannel.asMap().entrySet()) { + String queueName= topicEntry.getKey(); + String topic = "dsq_" + (("dedup".equals(queueType)) ? "dedup_" + queueName : queueName); + // Check if the topic exists, if not create it and execute Step Function + if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) { + Map parameters = fetchStepFunctionParameters(); + // Execute Step Function after topic creation + startStepFunctionExecution(parameters, queueType,queueName, topic); } - builder.putAll(queue, events); + producerService.sendMessages(topic, topicEntry.getValue(), queueType); + _log.info("Messages sent to topic: {}", topic); + } + _log.info("All messages have been sent to their respective queues."); + } + + @Override + public void sendAll(String queue, Collection messages, boolean fromKafka) { + //incoming message from kafka consume, save to cassandra + if(!fromKafka){ + validateQueue(queue, messages); + } + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + List events = Lists.newArrayListWithCapacity(messages.size()); + + + for (Object message : messages) { + ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); + checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, + "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + events.add(messageByteBuffer); } + builder.putAll(queue, events); Multimap eventsByChannel = builder.build(); _eventStore.addAll(eventsByChannel); @@ -196,14 +285,8 @@ public List poll(String queue, Duration claimTtl, int limit) { checkLegalQueueName(queue); checkArgument(claimTtl.toMillis() >= 0, "ClaimTtl must be >=0"); checkArgument(limit > 0, "Limit must be >0"); - List response = toMessages(_eventStore.poll(queue, claimTtl, limit)); - if(response.isEmpty()){ - _pollNullAQS.mark(); - } - else{ - _pollAQS.mark(response.size()); - } - return response; + + return toMessages(_eventStore.poll(queue, claimTtl, limit)); } @Override @@ -298,4 +381,92 @@ private void checkLegalQueueName(String queue) { "Allowed punctuation characters are -.:@_ and the queue name may not start with a single underscore character. " + "An example of a valid table name would be 'polloi:provision'."); } -} + /** + * Fetches the necessary Step Function parameters from AWS Parameter Store. + */ + private Map fetchStepFunctionParameters() { + List parameterNames = Arrays.asList( + "/emodb/stepfn/stateMachineArn", + "/emodb/stepfn/queueThreshold", + "/emodb/stepfn/batchSize", + "/emodb/stepfn/interval" + ); + + try { + return parameterStoreUtil.getParameters(parameterNames); + } catch (Exception e) { + _log.error("Failed to fetch Step Function parameters from Parameter Store", e); + throw new RuntimeException("Error fetching Step Function parameters", e); + } + } + + /** + * Executes the Step Function for a given topic after it has been created. + */ + + private void startStepFunctionExecution(Map parameters, String queueType, String queueName, String topic) { + try { + String stateMachineArn = parameters.get("/emodb/stepfn/stateMachineArn"); + int queueThreshold = Integer.parseInt(parameters.get("/emodb/stepfn/queueThreshold")); + int batchSize = Integer.parseInt(parameters.get("/emodb/stepfn/batchSize")); + int interval = Integer.parseInt(parameters.get("/emodb/stepfn/interval")); + + String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval); + + // Create the timestamp + String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds + + // Check if queueType is "dedup" and prepend "D" to execution name if true + String executionName = (queueType.equalsIgnoreCase("dedup") ? "D_" : "") + queueName + "_" + timestamp; + + // Start the Step Function execution + stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); + + _log.info("Step Function executed for topic: {} with executionName: {}", topic, executionName); + } catch (Exception e) { + _log.error("Error executing Step Function for topic: {}", topic, e); + throw new RuntimeException("Error executing Step Function for topic: " + topic, e); + } + } + + /** + * Determines the queue type based on the event store. + */ + private String determineQueueType() { + if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) { + return "dedup"; + } + return "queue"; + } + + private List fetchAllowedQueues() { + try { + // Fetch the 'allowedQueues' parameter using ParameterStoreUtil + String allowedQueuesStr = parameterStoreUtil.getParameter("/emodb/experiment/allowedQueues"); + return Arrays.asList(allowedQueuesStr.split(",")); + } catch (Exception e) { + // Handle the case when the parameter is not found or fetching fails + _log.error("Error fetching allowedQueues: " + e.getMessage()); + return Collections.singletonList(""); // Default to an empty list if the parameter is missing + } + } + + private String createInputPayload(int queueThreshold, int batchSize, String queueType, String queueName, String topicName, int interval) { + Map payloadData = new HashMap<>(); + payloadData.put("queueThreshold", queueThreshold); + payloadData.put("batchSize", batchSize); + payloadData.put("queueType", queueType); + payloadData.put("queueName", queueName); + payloadData.put("topicName", topicName); + payloadData.put("interval", interval); + Map wrappedData = new HashMap<>(); + wrappedData.put("executionInput", payloadData); // Wrap the data + try { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(wrappedData); // Convert wrapped data to JSON + } catch (JsonProcessingException e) { + _log.error("Error while converting map to JSON", e); + return "{}"; // Return empty JSON object on error + } + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java index 0955166218..9ec8d36606 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java @@ -4,7 +4,9 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.DedupQueueService; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; import java.time.Clock; @@ -12,7 +14,7 @@ public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService { @Inject public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, MetricRegistry metricRegistry) { - super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock, metricRegistry); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService ); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java index 947572208d..524ca50033 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java @@ -4,7 +4,9 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.QueueService; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; import java.time.Clock; @@ -12,7 +14,7 @@ public class DefaultQueueService extends AbstractQueueService implements QueueService { @Inject public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, MetricRegistry metricRegistry) { - super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock, metricRegistry); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java index 47e24ccf38..b349b19298 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java @@ -91,6 +91,11 @@ public void purge(String apiKey, String queue) { _dedupQueueService.purge(queue); } + @Override + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + _dedupQueueService.sendAll(queue, messages, isFlush); + } + @Override public void sendAll(String apiKey, Map> messagesByQueue) { _dedupQueueService.sendAll(messagesByQueue); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java index 5ceea10a8e..cdafc8935e 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java @@ -95,4 +95,9 @@ public void purge(String apiKey, String queue) { public void sendAll(String apiKey, Map> messagesByQueue) { _queueService.sendAll(messagesByQueue); } + + @Override + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + + } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java new file mode 100644 index 0000000000..cf4ee2c9d0 --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java @@ -0,0 +1,60 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +public class KafkaAdminService { + private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class); + private final AdminClient adminClient; + + public KafkaAdminService() { + this.adminClient = AdminClient.create(KafkaConfig.getAdminProps()); + } + + /** + * Creates a new Kafka topic with the specified configurations. + * + * @param topic The name of the topic. + * @param numPartitions Number of partitions. + * @param replicationFactor Replication factor. + */ + public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) { + Boolean isExisting =isTopicExists(topic); + if (! isExisting) { + //create the topic now + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); + try { + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + _log.info("Created topic: {} with numPartitions: {} ", topic, numPartitions, replicationFactor); + } catch (Exception e) { + _log.error("Error creating topic {}: {}", topic, e.getMessage()); + throw new RuntimeException(e); + } + } + return isExisting; + } + + + /** + * Determines if a topic already exists in AWS MSK + * @param topic The name of the topic. + */ + public boolean isTopicExists(String topic) { + try { + return adminClient.listTopics().names().get().contains(topic); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Closes the AdminClient to release resources. + */ + public void close() { + adminClient.close(); + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java new file mode 100644 index 0000000000..7a15a4c41c --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java @@ -0,0 +1,129 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.Parameter; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +public class KafkaConfig { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + // Static SSM Client and configuration using AWS SDK v1 + private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder + .standard() + .build(); + + private static final String DEFAULT_BOOTSTRAP_SERVERS = + "b-1.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092," + + "b-2.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092"; + + private static String bootstrapServersConfig; + private static String batchSizeConfig; + private static String retriesConfig; + private static String lingerMsConfig; + + static { + try { + // Load configurations from SSM during static initialization + Map parameterValues = getParameterValues( + Arrays.asList( + "/emodb/kafka/batchSize", + "/emodb/kafka/retries", + "/emodb/kafka/lingerMs", + "/emodb/kafka/bootstrapServers" + ) + ); + + // Set configurations with fallback to defaults if not present + // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending. + batchSizeConfig = parameterValues.getOrDefault("/emodb/kafka/batchSize", "16384"); + + // Sets the number of retry attempts for failed Kafka message sends. + retriesConfig = parameterValues.getOrDefault("/emodb/kafka/retries", "3"); + + // Sets the number of milliseconds a producer is willing to wait before sending a batch out + lingerMsConfig = parameterValues.getOrDefault("/emodb/kafka/lingerMs", "1"); + + // Configures the Kafka broker addresses for producer connections. + bootstrapServersConfig = parameterValues.getOrDefault("/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS); + + logger.info("Kafka configurations loaded successfully from SSM."); + } catch (AmazonServiceException e) { + logger.error("Failed to load configurations from SSM. Using default values.", e); + throw e; + } + catch (Exception e) { + logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e); + throw e; + } + } + + // Fetch parameters from AWS SSM using AWS SDK v1 + private static Map getParameterValues(List parameterNames) { + try { + GetParametersRequest request = new GetParametersRequest() + .withNames(parameterNames) + .withWithDecryption(true); + + GetParametersResult response = ssmClient.getParameters(request); + + return response.getParameters().stream() + .collect(Collectors.toMap(Parameter::getName, Parameter::getValue)); + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from SSM.", e); + throw e; // Rethrow or handle the exception if necessary + } + } + + // Kafka Producer properties + public static Properties getProducerProps() { + Properties producerProps = new Properties(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig)); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig)); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig)); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting + logger.info("Kafka Producer properties initialized."); + return producerProps; + } + + // Kafka Admin properties + public static Properties getAdminProps() { + Properties adminProps = new Properties(); + + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + logger.info("Kafka Admin properties initialized."); + return adminProps; + } + + // Ensure the SSM client is closed when the application shuts down + public static void shutdown() { + if (ssmClient != null) { + try { + ssmClient.shutdown(); + logger.info("SSM client closed successfully."); + } catch (Exception e) { + logger.error("Error while closing SSM client.", e); + } + } + } +} diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java new file mode 100644 index 0000000000..4219701734 --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java @@ -0,0 +1,66 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.Future; + +public class KafkaProducerService { + private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class); + private final KafkaProducer producer; // Changed to String + + public KafkaProducerService() { + this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps()); + _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps()); + } + + /** + * Sends each message from the collection to the specified Kafka topic separately. + * + * @param topic The Kafka topic. + * @param events The collection of messages to be sent. + */ + public void sendMessages(String topic, Collection events, String queueType) { + _log.info("Sending {} messages to topic '{}'", events.size(), topic); + for (String event : events) { + sendMessage(topic, event,queueType); + } + _log.info("Finished sending messages to topic '{}'", topic); + } + + /** + * Sends a single message to the specified Kafka topic. + * + * @param topic The Kafka topic. + * @param message The message to be sent. + */ + public void sendMessage(String topic, String message, String queueType) { + ProducerRecord record = new ProducerRecord<>(topic, message); + try { + Future future = producer.send(record, (metadata, exception) -> { + if (exception != null) { + _log.error("Failed to send message to topic '{}'. Error: {}", topic, exception.getMessage()); + } + }); + // Optionally, you can wait for the send to complete + RecordMetadata metadata = future.get(); // Blocking call + } catch (Exception e) { + _log.error("Failed to send message to topic '{}'. Exception: {}", topic, e.getMessage()); + throw new RuntimeException("Error sending message to kafka"+e.getMessage()); + } + } + + /** + * Closes the producer to release resources. + */ + public void close() { + _log.info("Closing Kafka producer."); + producer.flush(); + producer.close(); + _log.info("Kafka producer closed."); + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java new file mode 100644 index 0000000000..6f3ee6f6d1 --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java @@ -0,0 +1,112 @@ +package com.bazaarvoice.emodb.queue.core.ssm; + +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class for interacting with AWS Parameter Store using AWS SDK v1. + */ +public class ParameterStoreUtil { + + private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class); + private final AWSSimpleSystemsManagement ssmClient; + + /** + * Constructor to initialize the SSM client + */ + public ParameterStoreUtil() { + // Create SSM client with default credentials and region + ssmClient = AWSSimpleSystemsManagementClientBuilder.standard() + .build(); + } + + /** + * Fetches a parameter from AWS Parameter Store. + * + * @param parameterName The name of the parameter to fetch + * @return The value of the parameter + * @throws IllegalArgumentException If the parameterName is null or empty + */ + public String getParameter(String parameterName) { + if (parameterName == null || parameterName.isEmpty()) { + logger.error("Parameter name cannot be null or empty"); + throw new IllegalArgumentException("Parameter name cannot be null or empty"); + } + + try { + //logger.info("Fetching parameter from AWS Parameter Store: {}", parameterName); + + GetParameterRequest request = new GetParameterRequest().withName(parameterName); + GetParameterResult result = ssmClient.getParameter(request); + + //logger.info("Successfully retrieved parameter: {}", parameterName); + return result.getParameter().getValue(); + + } catch (ParameterNotFoundException e) { + logger.error("Parameter not found: {}", parameterName, e); + throw new RuntimeException("Parameter not found: " + parameterName, e); + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameter: {}", parameterName, e); + throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e); + } + } + + /** + * Fetches multiple parameters from AWS Parameter Store in a batch. + * + * @param parameterNames The list of parameter names to fetch + * @return A map of parameter names to their values + * @throws IllegalArgumentException If the parameterNames list is null or empty + */ + public Map getParameters(List parameterNames) { + if (parameterNames == null || parameterNames.isEmpty()) { + logger.error("Parameter names list cannot be null or empty"); + throw new IllegalArgumentException("Parameter names list cannot be null or empty"); + } + + try { + logger.info("Fetching parameters from AWS Parameter Store: {}", parameterNames); + + GetParametersRequest request = new GetParametersRequest().withNames(parameterNames); + GetParametersResult result = ssmClient.getParameters(request); + + // Map the result to a Map of parameter names and values + Map parameters = new HashMap<>(); + result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue())); + + // Log any parameters that were not found + if (!result.getInvalidParameters().isEmpty()) { + logger.warn("The following parameters were not found: {}", result.getInvalidParameters()); + } + + logger.info("Successfully retrieved {} parameters", parameters.size()); + return parameters; + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameters: {}", parameterNames, e); + throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e); + } + } + +} diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java new file mode 100644 index 0000000000..580b48efb3 --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java @@ -0,0 +1,66 @@ +package com.bazaarvoice.emodb.queue.core.stepfn; + + +import com.amazonaws.services.stepfunctions.AWSStepFunctions; +import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder; +import com.amazonaws.services.stepfunctions.model.StartExecutionRequest; +import com.amazonaws.services.stepfunctions.model.StartExecutionResult; +import com.amazonaws.services.stepfunctions.model.StateMachineDoesNotExistException; +import com.amazonaws.services.stepfunctions.model.InvalidArnException; +import com.amazonaws.services.stepfunctions.model.InvalidExecutionInputException; +import com.amazonaws.services.stepfunctions.model.AWSStepFunctionsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to interact with AWS Step Functions using AWS SDK v1. + */ +public class StepFunctionService { + + private static final Logger logger = LoggerFactory.getLogger(StepFunctionService.class); + + private final AWSStepFunctions stepFunctionsClient; + + /** + * Constructor to initialize Step Function Client with AWS region and credentials. + */ + public StepFunctionService() { + this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard() + .build(); + } + + /** + * Starts the execution of a Step Function with the given state machine ARN and input payload. + * + * @param stateMachineArn ARN of the state machine + * @param inputPayload Input for the state machine execution + * @throws IllegalArgumentException If the stateMachineArn is invalid + */ + public void startExecution(String stateMachineArn, String inputPayload, String executionName) { + if (stateMachineArn == null || stateMachineArn.isEmpty()) { + logger.error("State Machine ARN cannot be null or empty"); + throw new IllegalArgumentException("State Machine ARN cannot be null or empty"); + } + + if (inputPayload == null) { + logger.warn("Input payload is null; using empty JSON object"); + inputPayload = "{}"; // Default to empty payload if null + } + + try { + StartExecutionRequest startExecutionRequest = new StartExecutionRequest() + .withStateMachineArn(stateMachineArn) + .withInput(inputPayload) + .withName(executionName); + + StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest); + + logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn); + logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn()); + + } catch (Exception e) { + logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e); + throw e; + } + } +} diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java index 90a0ea837d..0ac9a0ed89 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java @@ -4,7 +4,10 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.job.api.JobType; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.testng.annotations.Test; import java.time.Clock; @@ -38,7 +41,7 @@ public void testSizeCache() { BaseEventStore mockEventStore = mock(BaseEventStore.class); AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class), - mock(JobHandlerRegistry.class), mock(JobType.class), clock, new MetricRegistry()){}; + mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){}; // At limit=500, size estimate should be at 4800 // At limit=50, size estimate should be at 5000 diff --git a/sdk/pom.xml b/sdk/pom.xml index e8f8b7eb5b..5512600c23 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/sor-api/pom.xml b/sor-api/pom.xml index d7bc799c60..ce3e5d327a 100644 --- a/sor-api/pom.xml +++ b/sor-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml index 8028d79623..26a5da4e91 100644 --- a/sor-client-common/pom.xml +++ b/sor-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml index 4141cd9d99..fc2d848a95 100644 --- a/sor-client-jersey2/pom.xml +++ b/sor-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/sor-client/pom.xml b/sor-client/pom.xml index 100d90210e..33939689ea 100644 --- a/sor-client/pom.xml +++ b/sor-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/sor/pom.xml b/sor/pom.xml index d2a61758ef..b58f592892 100644 --- a/sor/pom.xml +++ b/sor/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/table/pom.xml b/table/pom.xml index e4592dd6b0..1e1b16c4cd 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/uac-api/pom.xml b/uac-api/pom.xml index b9ba2a6008..57846a2fa4 100644 --- a/uac-api/pom.xml +++ b/uac-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml index 38984d2abd..b1437d89e2 100644 --- a/uac-client-jersey2/pom.xml +++ b/uac-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/uac-client/pom.xml b/uac-client/pom.xml index 55aae7fdb8..c28d926330 100644 --- a/uac-client/pom.xml +++ b/uac-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/web-local/pom.xml b/web-local/pom.xml index 4535e14494..94fb04b7ab 100644 --- a/web-local/pom.xml +++ b/web-local/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/web/pom.xml b/web/pom.xml index eadb8ccbf7..acd9bdd28a 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java index 00c380af6a..b86b38943d 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java @@ -381,8 +381,6 @@ public Collection getTablePlacements(@Authenticated Subject subject) { _getTablePlacementsRequestsByApiKey.getUnchecked(subject.getId()).mark(); return _blobStore.getTablePlacements(); } - - /** * Retrieves the current version of a piece of content from the data store. */ @@ -600,4 +598,4 @@ private Supplier onceOnlySupplier(final InputStream in) { return in; }; } -} +} \ No newline at end of file diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java index c6dcd408fc..34c32181e9 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java @@ -110,6 +110,20 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection messages) { + // Not partitioned--any server can write messages to Cassandra. + _queueService.sendAll(queue, messages,true); + return SuccessResponse.instance(); + } @POST @Path("_sendbatch") diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java index ff6334db05..e946acf8aa 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java @@ -35,6 +35,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -120,6 +121,24 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection events) { + //TODO change query param name / type + // Not partitioned--any server can write messages to Cassandra. + _queueService.sendAll(queue, events, true); + return SuccessResponse.instance(); + } + @POST @Path("_sendbatch") @Consumes(MediaType.APPLICATION_JSON) diff --git a/yum/pom.xml b/yum/pom.xml index adee7cf547..731621e189 100644 --- a/yum/pom.xml +++ b/yum/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.185-SNAPSHOT ../parent/pom.xml