diff --git a/auth/auth-client/pom.xml b/auth/auth-client/pom.xml index 796cf6ba45..1890a704a5 100644 --- a/auth/auth-client/pom.xml +++ b/auth/auth-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-core/pom.xml b/auth/auth-core/pom.xml index c92569720a..523af72412 100644 --- a/auth/auth-core/pom.xml +++ b/auth/auth-core/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-store/pom.xml b/auth/auth-store/pom.xml index b376c46858..b611c249b5 100644 --- a/auth/auth-store/pom.xml +++ b/auth/auth-store/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-util/pom.xml b/auth/auth-util/pom.xml index 4b364c87cd..59a7d10563 100644 --- a/auth/auth-util/pom.xml +++ b/auth/auth-util/pom.xml @@ -3,7 +3,7 @@ emodb com.bazaarvoice.emodb - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/blob-api/pom.xml b/blob-api/pom.xml index a2ca881d88..5d7b92e667 100644 --- a/blob-api/pom.xml +++ b/blob-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/blob-clients/blob-client-common/pom.xml b/blob-clients/blob-client-common/pom.xml index 7eb85bd767..a3590ed25f 100644 --- a/blob-clients/blob-client-common/pom.xml +++ b/blob-clients/blob-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client-jersey2/pom.xml b/blob-clients/blob-client-jersey2/pom.xml index 9acc0a03b6..771fdf0354 100644 --- a/blob-clients/blob-client-jersey2/pom.xml +++ b/blob-clients/blob-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client/pom.xml b/blob-clients/blob-client/pom.xml index 2f2dd1dfb4..d7e1c2fcef 100644 --- a/blob-clients/blob-client/pom.xml +++ b/blob-clients/blob-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/blob/pom.xml b/blob/pom.xml index 39598e29bb..0e243280df 100644 --- a/blob/pom.xml +++ b/blob/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/cachemgr/pom.xml b/cachemgr/pom.xml index f28a4d0361..874dca3e62 100644 --- a/cachemgr/pom.xml +++ b/cachemgr/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/common/api/pom.xml b/common/api/pom.xml index 91ad32bf80..51fff44d63 100644 --- a/common/api/pom.xml +++ b/common/api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/astyanax/pom.xml b/common/astyanax/pom.xml index f9d2f9b184..4675d2f463 100644 --- a/common/astyanax/pom.xml +++ b/common/astyanax/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jax-rs-2/pom.xml b/common/client-jax-rs-2/pom.xml index 386fe77543..d4b2e57e73 100644 --- a/common/client-jax-rs-2/pom.xml +++ b/common/client-jax-rs-2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jersey2/pom.xml b/common/client-jersey2/pom.xml index 98c0636af8..7cb8297b46 100644 --- a/common/client-jersey2/pom.xml +++ b/common/client-jersey2/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/client/pom.xml b/common/client/pom.xml index b061d2593a..2003d93270 100644 --- a/common/client/pom.xml +++ b/common/client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/dropwizard/pom.xml b/common/dropwizard/pom.xml index 7497d6f5e8..b19cb6b489 100644 --- a/common/dropwizard/pom.xml +++ b/common/dropwizard/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/jersey-client/pom.xml b/common/jersey-client/pom.xml index 28f181ade5..6eaf3310ee 100644 --- a/common/jersey-client/pom.xml +++ b/common/jersey-client/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/json/pom.xml b/common/json/pom.xml index 4677765c64..83dc90371a 100644 --- a/common/json/pom.xml +++ b/common/json/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/stash/pom.xml b/common/stash/pom.xml index 37c652b0eb..af7efbd3b8 100644 --- a/common/stash/pom.xml +++ b/common/stash/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/uuid/pom.xml b/common/uuid/pom.xml index 6a70e7ca21..4cf563bb24 100644 --- a/common/uuid/pom.xml +++ b/common/uuid/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/common/zookeeper/pom.xml b/common/zookeeper/pom.xml index 61919516b8..7e07c275bc 100644 --- a/common/zookeeper/pom.xml +++ b/common/zookeeper/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/databus-api/pom.xml b/databus-api/pom.xml index a06b547200..826efd6eac 100644 --- a/databus-api/pom.xml +++ b/databus-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/Databus.java b/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/Databus.java index 1a106eedc8..45206ee902 100644 --- a/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/Databus.java +++ b/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/Databus.java @@ -52,6 +52,10 @@ Subscription getSubscription(String subscription) */ long getEventCountUpTo(String subscription, long limit); + default long getMasterCount(){ + return 0; + }; + /** Counts the number of events with outstanding claims that cause the events to be skipped by {@link #poll}. */ long getClaimCount(String subscription); diff --git a/databus-client-common/pom.xml b/databus-client-common/pom.xml index 8a0ed365f9..739a9e4916 100644 --- a/databus-client-common/pom.xml +++ b/databus-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-jersey2/pom.xml b/databus-client-jersey2/pom.xml index be936e3ae6..3d9abfe14f 100644 --- a/databus-client-jersey2/pom.xml +++ b/databus-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/databus-client/pom.xml b/databus-client/pom.xml index 7c9dab8850..6adb088d7e 100644 --- a/databus-client/pom.xml +++ b/databus-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/databus/pom.xml b/databus/pom.xml index 7efe1fb644..810134724e 100644 --- a/databus/pom.xml +++ b/databus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java index b0f72adc7e..3c2cc1b3c0 100644 --- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java +++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java @@ -78,6 +78,11 @@ public long getEventCountUpTo(String subscription, long limit) { return _ownerAwareDatabus.getEventCountUpTo(ownerId, subscription, limit); } + @Override + public long getMasterCount() { + return _ownerAwareDatabus.getMasterEventCountUncached(ownerId); + } + @Override public long getClaimCount(String subscription) { return _ownerAwareDatabus.getClaimCount(ownerId, subscription); diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java index d9a761d562..128a835d90 100644 --- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java +++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java @@ -411,6 +411,18 @@ public long getEventCount(String ownerId, String subscription) { return getEventCountUpTo(ownerId, subscription, Long.MAX_VALUE); } + @Override + public long getMasterEventCountUncached(String ownerId) { + _log.info("Inside getMasterEventCountUncached with _masterFanoutChannels length {}", _masterFanoutChannels.size()); + long size = 0; + for(String channel : _masterFanoutChannels) { + size += _eventStore.getSizeEstimate(channel, Long.MAX_VALUE); + _log.info("From channel size {} {}:", channel, size); + } + return size; + } + + @Override public long getEventCountUpTo(String ownerId, String subscription, long limit) { checkSubscriptionOwner(ownerId, subscription); diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java index d6ee266fb8..8c3117693b 100644 --- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java +++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java @@ -41,6 +41,9 @@ Subscription getSubscription(String ownerId, String subscription) long getEventCount(String ownerId, String subscription) throws UnauthorizedSubscriptionException; + public long getMasterEventCountUncached(String ownerId) + throws UnauthorizedSubscriptionException; + long getEventCountUpTo(String ownerId, String subscription, long limit) throws UnauthorizedSubscriptionException; diff --git a/datacenter/pom.xml b/datacenter/pom.xml index f2e61457c4..016d15560d 100644 --- a/datacenter/pom.xml +++ b/datacenter/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/event/pom.xml b/event/pom.xml index 17c5654cd6..5112994bcf 100644 --- a/event/pom.xml +++ b/event/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/job-api/pom.xml b/job-api/pom.xml index d245afbd9a..2cbd6ef1b4 100644 --- a/job-api/pom.xml +++ b/job-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/job/pom.xml b/job/pom.xml index e8d993d42f..394a0a1d3f 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/kafka/pom.xml b/kafka/pom.xml index 01cd9ac276..d2e6de7ccf 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/megabus/pom.xml b/megabus/pom.xml index 0da2ca4e4e..c2e44a8941 100644 --- a/megabus/pom.xml +++ b/megabus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/parent/pom.xml b/parent/pom.xml index 4982822cf1..3c38e1aa12 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -11,7 +11,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT pom EmoDB Parent @@ -20,7 +20,7 @@ scm:git:git@github.com:bazaarvoice/emodb.git scm:git:git@github.com:bazaarvoice/emodb.git scm:git:git@github.com:bazaarvoice/emodb.git - emodb-6.5.98 + emodb-6.5.208-SNAPSHOT diff --git a/plugins/pom.xml b/plugins/pom.xml index bcdc8c9433..0adfbc7572 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/pom.xml b/pom.xml index ad99f57870..3c51863038 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT parent/pom.xml @@ -18,7 +18,7 @@ scm:git:git@github.com:bazaarvoice/emodb.git scm:git:git@github.com:bazaarvoice/emodb.git scm:git:git@github.com:bazaarvoice/emodb.git - emodb-6.5.98 + emodb-6.5.208-SNAPSHOT diff --git a/quality/integration/pom.xml b/quality/integration/pom.xml index f577ab1224..4deee0ebee 100644 --- a/quality/integration/pom.xml +++ b/quality/integration/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../../parent/pom.xml diff --git a/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java b/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java index c09cceaa38..541c385863 100644 --- a/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java +++ b/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java @@ -8,6 +8,7 @@ import com.bazaarvoice.emodb.sor.api.Intrinsic; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; @@ -22,6 +23,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -38,7 +40,7 @@ public class TableAuthIdentityManagerDAOTest { */ @Test public void testRebuildIdIndex() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry()); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", @@ -76,7 +78,7 @@ public void testRebuildIdIndex() { @Test public void testGrandfatheredInId() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry()); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", @@ -128,7 +130,7 @@ public void testGrandfatheredInId() { @Test public void testIdAttributeCompatibility() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry()); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", diff --git a/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java b/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java index 6b02653a6c..409d5193db 100644 --- a/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java +++ b/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java @@ -18,6 +18,7 @@ import com.bazaarvoice.emodb.sor.api.WriteConsistency; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.auth.EmoPermissionResolver; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; @@ -36,10 +37,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -63,7 +61,7 @@ public class TableRoleManagerDAOTest { @BeforeMethod public void setUp() { // DataStore and PermissionManager are fairly heavy to fully mock. Use spies on in-memory implementations instead - _backendDataStore = new InMemoryDataStore(new MetricRegistry()); + _backendDataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _dataStore = spy(_backendDataStore); _permissionResolver = new EmoPermissionResolver(null, null); _backendPermissionManager = new InMemoryPermissionManager(_permissionResolver); diff --git a/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java b/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java index a90855e41e..814e6d3f36 100644 --- a/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java +++ b/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java @@ -17,6 +17,7 @@ import com.bazaarvoice.emodb.sor.db.astyanax.DeltaPlacementFactory; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxTableDAO; import com.bazaarvoice.emodb.table.db.astyanax.CQLStashTableDAO; @@ -97,7 +98,7 @@ public void setup() throws Exception { _astyanaxTableDAO.setCQLStashTableDAO(cqlStashTableDAO); // Don't store table definitions in the actual backing store so as not to interrupt other tests. Use a // private in-memory implementation. - _tableBackingStore = new InMemoryDataStore(new MetricRegistry()); + _tableBackingStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _astyanaxTableDAO.setBackingStore(_tableBackingStore); _lifeCycleRegistry.start(); diff --git a/quality/pom.xml b/quality/pom.xml index 24ba2fc7a9..582cd1812d 100644 --- a/quality/pom.xml +++ b/quality/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/pom.xml b/queue-api/pom.xml index 284f187d45..639dbef3b1 100644 --- a/queue-api/pom.xml +++ b/queue-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java index 3f5fd99950..714121e0e7 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java @@ -27,6 +27,11 @@ public interface BaseQueueService { */ long getMessageCount(String queue); + /** + * Get the queue size without caching + *

+ * returns long + */ default long getUncachedSize(String queue){ return 0; } diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java index a6dc77515b..5952c87b08 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java @@ -15,6 +15,7 @@ public interface DedupQueueService extends BaseQueueService { void sendAll(Map> messagesByQueue); + //Overloaded sendAll method to send to cassandra void sendAll(String queue, Collectionmessages, boolean isFlush); /** @@ -26,6 +27,10 @@ public interface DedupQueueService extends BaseQueueService { */ long getMessageCount(String queue); + default long getUncachedSize(String queue){ + return 0; + } + /** * Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the * specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java index 4d533c755e..c71e80d4d1 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java @@ -13,7 +13,6 @@ public interface QueueService extends BaseQueueService { void sendAll(String queue, Collection messages); - void sendAll(Map> messagesByQueue); //Overloaded sendAll method to send to cassandra @@ -28,6 +27,15 @@ public interface QueueService extends BaseQueueService { */ long getMessageCount(String queue); + /** + * Get the queue size without caching + *

+ * returns long + */ + default long getUncachedSize(String queue){ + return 0; + } + /** * Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the * specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between diff --git a/queue-client-common/pom.xml b/queue-client-common/pom.xml index 6ce174aa59..906e8d8eba 100644 --- a/queue-client-common/pom.xml +++ b/queue-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-jersey2/pom.xml b/queue-client-jersey2/pom.xml index 062f708470..2e393c164c 100644 --- a/queue-client-jersey2/pom.xml +++ b/queue-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue-client/pom.xml b/queue-client/pom.xml index f233b22dbd..56cde86315 100644 --- a/queue-client/pom.xml +++ b/queue-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue/pom.xml b/queue/pom.xml index 0ec14488a6..fa82ee16cd 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java index b42d33b02c..dac7ec6975 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java @@ -84,15 +84,12 @@ protected void configure() { bind(DedupEventStoreChannels.class).toInstance(DedupEventStoreChannels.isolated("__dedupq_write:", "__dedupq_read:")); bind(new TypeLiteral>() {}).annotatedWith(DedupEnabled.class).toInstance(Suppliers.ofInstance(true)); install(new EventStoreModule("bv.emodb.queue", _metricRegistry)); - - // Bind Kafka services - bind (KafkaAdminService.class).asEagerSingleton(); + bind(KafkaAdminService.class).asEagerSingleton(); bind(KafkaProducerService.class).asEagerSingleton(); // Bind Step Function Service bind(StepFunctionService.class).asEagerSingleton(); - // Bind the Queue instance that the rest of the application will consume bind(QueueService.class).to(DefaultQueueService.class).asEagerSingleton(); expose(QueueService.class); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index 0793a08b2a..c16657aa96 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -454,12 +454,11 @@ private void startStepFunctionExecution(Map parameters, String q String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval); - // Create the timestamp - String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds - + // Current time in milliseconds + String timestamp = String.valueOf(System.currentTimeMillis()); + queueName = stepFunctionService.sanitizeExecutionName(queueName); // Check if queueType is "dedupq" and prepend "D" to execution name if true String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp; - // Start the Step Function execution stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java new file mode 100644 index 0000000000..13f396fe9a --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java @@ -0,0 +1,14 @@ +package com.bazaarvoice.emodb.queue.core.Entities; + +public class ExecutionInputWrapper { + private QueueExecutionAttributes executionInput; + + // Getter and Setter + public QueueExecutionAttributes getExecutionInput() { + return executionInput; + } + + public void setExecutionInput(QueueExecutionAttributes executionInput) { + this.executionInput = executionInput; + } +} diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java new file mode 100644 index 0000000000..a35b60cbaf --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java @@ -0,0 +1,109 @@ +package com.bazaarvoice.emodb.queue.core.Entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Map; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class QueueExecutionAttributes { + + private String queueName; + private String queueType; + private String topicName; + private Integer queueThreshold; + private Integer batchSize; + private Integer interval; + private String status; + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public void setQueueType(String queueType) { + this.queueType = queueType; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setQueueThreshold(int queueThreshold) { + this.queueThreshold = queueThreshold; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public void setInterval(int interval) { + this.interval = interval; + } + + public Integer getQueueThreshold() { + return queueThreshold; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getInterval() { + return interval; + } + + public String getQueueName() { + return queueName; + } + + public String getQueueType() { + return queueType; + } + + public String getTopicName() { + return topicName; + } + + public String getJsonPayload(QueueExecutionAttributes attributes) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(attributes); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof QueueExecutionAttributes)) return false; + QueueExecutionAttributes that = (QueueExecutionAttributes) o; + return Objects.equals(getQueueName(), that.getQueueName()) && Objects.equals(getQueueType(), that.getQueueType()) && Objects.equals(getTopicName(), that.getTopicName()) && Objects.equals(getQueueThreshold(), that.getQueueThreshold()) && Objects.equals(getBatchSize(), that.getBatchSize()) && Objects.equals(getInterval(), that.getInterval()); + } + + @Override + public int hashCode() { + return Objects.hash(getQueueName(), getQueueType(), getTopicName(), getQueueThreshold(), getBatchSize(), getInterval()); + } + + @Override + public String toString() { + return "QueueExecutionAttributes{" + + "queueName='" + queueName + '\'' + + ", queueType='" + queueType + '\'' + + ", topicName='" + topicName + '\'' + + ", queueThreshold=" + queueThreshold + + ", batchSize=" + batchSize + + ", interval=" + interval + + ", status='" + status + '\'' + + '}'; + } +} diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java index 13011e8bcc..c4ab54ad99 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java @@ -58,7 +58,7 @@ public class KafkaConfig { lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1"); // Configures the Kafka broker addresses for producer connections. - bootstrapServersConfig = parameterValues.get("/" + UNIVERSE + "/emodb/kafka/bootstrapServers"); + bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", "localhost:9092"); logger.info("Kafka configurations loaded successfully from SSM."); } catch (AmazonServiceException e) { diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java index 1a7cc72eee..efbfc5f27c 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java @@ -46,7 +46,7 @@ public void sendMessages(String topic, Collection events, String queueTy throw new RuntimeException("Error sending messages to Kafka", e); } } - _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis()); + _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis()); } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java index d2d65c144f..f1bfdb0d01 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java @@ -4,6 +4,8 @@ import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest; import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult; +import com.amazonaws.services.simplesystemsmanagement.model.PutParameterRequest; +import com.amazonaws.services.simplesystemsmanagement.model.PutParameterResult; import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException; @@ -104,4 +106,22 @@ public Map getParameters(List parameterNames) { } } + public Long updateParameter(String key, String value) { + try { + if (key == null || key.trim().isEmpty()) { + logger.error("parameter name cannot be null or blank"); + throw new IllegalArgumentException("parameter name cannot be null or blank"); + } + + PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true); + + PutParameterResult response = ssmClient.putParameter(request); + logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion()); + return response.getVersion(); + } catch (Exception e) { + logger.error("Failed to update parameter: " + key + " with value: " + value, e); + throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e); + } + } + } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java index ec4165d03f..a41d95a5a7 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java @@ -3,10 +3,21 @@ import com.amazonaws.services.stepfunctions.AWSStepFunctions; import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder; +import com.amazonaws.services.stepfunctions.model.*; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig; import com.amazonaws.services.stepfunctions.model.StartExecutionRequest; import com.amazonaws.services.stepfunctions.model.StartExecutionResult; +import com.bazaarvoice.emodb.queue.core.Entities.QueueExecutionAttributes; +import com.bazaarvoice.emodb.queue.core.Entities.ExecutionInputWrapper; +import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.util.List; +import java.util.MissingResourceException; /** * Service to interact with AWS Step Functions using AWS SDK v1. @@ -16,15 +27,171 @@ public class StepFunctionService { private static final Logger logger = LoggerFactory.getLogger(StepFunctionService.class); private final AWSStepFunctions stepFunctionsClient; + private static String universe; + private final ParameterStoreUtil _parameterStoreUtil; /** * Constructor to initialize Step Function Client with AWS region and credentials. */ public StepFunctionService() { - this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard() - .build(); + universe=KafkaConfig.getUniverseFromEnv(); + this._parameterStoreUtil = new ParameterStoreUtil(); + this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard().build(); + } + + /** + * Sanitizes the execution name by replacing invalid characters with underscores + * and truncating if needed. + */ + public String sanitizeExecutionName(String executionName) { + if (executionName == null || executionName.isEmpty()) { + throw new IllegalArgumentException("Execution name cannot be null or empty"); + } + executionName = executionName.trim(); + // Replace invalid characters with underscores + String sanitized = executionName.replaceAll("[^a-zA-Z0-9\\-_]", "_"); + + // Check if the sanitized name is empty or consists only of underscores + if (sanitized.isEmpty() || sanitized.replaceAll("_", "").isEmpty()) { + throw new IllegalArgumentException("Execution name cannot contain only invalid characters"); + } + + // Truncate from the beginning if length exceeds 66 characters + if (sanitized.length() > 66) { + sanitized = sanitized.substring(sanitized.length() - 66); + } + + // Log the updated execution name if it has changed + if (!sanitized.equals(executionName)) { + logger.info("Updated execution name: {}", sanitized); + } + return sanitized; + } + + + /** + * Starts the execution of a Step Function with provided execution attributes. + * + * @param queueExecutionAttributes execution input attributes + * @throws RuntimeException If method fails to re-start or start with provided execution input attributes + * + * queueType and queueName are mandatory inputs + * CASE-1 (status = "DISABLED" provided) : active execution if any, stops. + * CASE-2 (all 4 inputs(qt, bs, i, tn) provided): a new execution is started with these attributes, stopping any active one. + * CASE-3 (any/all of 4 inputs(qt, bs, i, tn) missing): If any active execution exist, a new execution is started with provided inputs updated, stopping the active one + * CASE-4 (any/all of 4 inputs(qt, bs, i, tn) missing): If any active execution doesn't exist, Exception occurs, IllegalArgumentException + */ + public void startSFNWithAttributes(QueueExecutionAttributes queueExecutionAttributes) { + QueueExecutionAttributes existingAttributes; + + //1. fetch attributes for any existing execution + try { + existingAttributes = getExistingSFNAttributes(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName()); + } catch (Exception e) { + logger.error("Error getting existing step-function attributes for " + queueExecutionAttributes + " | " + e.getMessage()); + throw new RuntimeException("Error getting existing step-function attributes for " + queueExecutionAttributes + " | " + e.getMessage()); + } + + //2. if no running execution exists, start a new one with provided/new attributes + if (existingAttributes == null) { + try { + startExecution(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName(), queueExecutionAttributes); + return; + } catch(Exception e){ + logger.error("Error starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage()); + throw new RuntimeException("Error starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage()); + } + } + + //3. check sanity of starting a new execution before stopping the older execution. + syncFreshAttributesFromExistingExecution(queueExecutionAttributes, existingAttributes); + + //4. stop active execution (if any) + try { + stopActiveExecutions(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName()); + logger.info("Successfully stopped active execution(if any) for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType()); + } catch(Exception e){ + logger.error("Error stopping step-function for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType() + " | " + e.getMessage()); + throw new RuntimeException("Error stopping step-function for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType() + " | " + e.getMessage()); + } + + //4. if new attributes can't start a fresh execution, re-start the already running sfn, else start a fresh execution with new attributes + try { + startExecution(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName(), queueExecutionAttributes); + } catch (Exception e){ + logger.error("Error re-starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage()); + throw new RuntimeException("Error re-starting step-function with attributes " + queueExecutionAttributes + "|" + e.getMessage()); + } + } + + + /** + * Starts an execution of step-function associated with (queueType, queueName), with provided attributes. + * + * @param queueType queueType + * @param queueName queueName + * @param executionAttributes execution inputs + * + */ + public void startExecution(String queueType, String queueName, QueueExecutionAttributes executionAttributes) throws JsonProcessingException { + + if(executionAttributes == null) { + throw new IllegalArgumentException("execution input object can't be null"); + } + + if(executionAttributes.getStatus() == null || executionAttributes.getStatus().isEmpty()) { + executionAttributes.setStatus("ENABLED"); + } + + if(executionAttributes.getStatus().equals("DISABLED")) { + logger.info("step-function's execution can't be triggered because status=DISABLED provided" ); + return; + } + + String payload = constructPayload(queueName, queueType, executionAttributes); + + try { + String stateMachineArn = getStateMachineARN(); + String executionName = (queueType.equalsIgnoreCase("dedup") ? "D_" : "") + queueName + "_" + System.currentTimeMillis(); + StartExecutionRequest startExecutionRequest = new StartExecutionRequest().withStateMachineArn(stateMachineArn) + .withInput(payload) + .withName(executionName); + + StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest); + + logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn); + logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn()); + + } catch (StateMachineDoesNotExistException e) { + logger.error("State Machine does not exist for queue_type: " + queueType + ", queue_name: " + queueName, e); + } catch (InvalidArnException e) { + logger.error("Invalid ARN provided for queue_type: " + queueType + ", queue_name: " + queueName, e); + } catch (InvalidExecutionInputException e) { + logger.error("Invalid execution input provided: {}", payload, e); + } catch (AWSStepFunctionsException e) { + logger.error("Error executing Step Function: {}", e.getMessage(), e); + throw e; // Re-throw after logging + } catch (Exception e) { + logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e); + throw e; // Re-throw unexpected exceptions + } } + + private String constructPayload(String queueName, String queueType, QueueExecutionAttributes executionAttributes) throws JsonProcessingException { + + validateExecutionInputs(queueName, queueType, executionAttributes); + executionAttributes.setQueueType(queueType); + executionAttributes.setQueueName(queueName); + + ExecutionInputWrapper executionInputWrapper = new ExecutionInputWrapper(); + executionInputWrapper.setExecutionInput(executionAttributes); + + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(executionInputWrapper); + } + + /** * Starts the execution of a Step Function with the given state machine ARN and input payload. * @@ -59,4 +226,207 @@ public void startExecution(String stateMachineArn, String inputPayload, String e throw e; } } + + + /** + * Gets execution inputs of an active/running step-function associated with (queueType, queueName). + * + * @param queueType queueType + * @param queueName queueName + * + * @return valid QueueExecutionAttributes : if any active execution exists, else NULL. + * + * @throws JsonProcessingException If execution input attributes fails in getting converted to a valid execution payload json + */ + public QueueExecutionAttributes getExistingSFNAttributes(String queueType, String queueName) throws JsonProcessingException { + try { + List executionItemList = getAllActiveExecutionArns(); + + for(ExecutionListItem executionItem : executionItemList) { + String executionARN = executionItem.getExecutionArn(); + + DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest().withExecutionArn(executionARN); + DescribeExecutionResult describeExecutionResult = stepFunctionsClient.describeExecution(describeExecutionRequest); + + String existingInputPayload = describeExecutionResult.getInput(); + QueueExecutionAttributes queueExecutionAttributes = new ObjectMapper().readValue(existingInputPayload, ExecutionInputWrapper.class).getExecutionInput(); + + if(queueExecutionAttributes.getQueueType() != null && queueExecutionAttributes.getQueueType().equals(queueType) + && queueExecutionAttributes.getQueueName() != null && queueExecutionAttributes.getQueueName().equals(queueName)) { + logger.info("Fetched attributes for executionArn: " + executionARN + " => " + queueExecutionAttributes); + return queueExecutionAttributes; + } + } + + logger.info("No active executions found for queue_type: " + queueType + ", queue_name: " + queueName + " stateMachineARN: "); + return null; + + } catch (Exception e) { + logger.error("Unexpected error in fetching sfn attributes for queue_type: " + queueType + ", queue_name: " + queueName); + throw e; + } + } + + + /** + * Stops an active execution of step-function associated with (queueType, queueName), if any. + * + * @param queueType queueType + * @param queueName queueName + * + * @throws Exception: If some glitch happens in stopping. + */ + public void stopActiveExecutions(String queueType, String queueName) throws JsonProcessingException { + + try { + List executionItemList = getAllActiveExecutionArns(); + + for(ExecutionListItem executionItem : executionItemList) { + String executionARN = executionItem.getExecutionArn(); + + DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest().withExecutionArn(executionARN); + DescribeExecutionResult describeExecutionResult = stepFunctionsClient.describeExecution(describeExecutionRequest); + + String existingInputPayload = describeExecutionResult.getInput(); + QueueExecutionAttributes queueExecutionAttributes = new ObjectMapper().readValue(existingInputPayload, ExecutionInputWrapper.class).getExecutionInput(); + + if(queueExecutionAttributes.getQueueType() != null && queueExecutionAttributes.getQueueType().equals(queueType) + && queueExecutionAttributes.getQueueName() != null && queueExecutionAttributes.getQueueName().equals(queueName)) { + logger.info("Stopping active execution: " + executionARN); + + StopExecutionRequest stopRequest = new StopExecutionRequest().withExecutionArn(executionARN); + stepFunctionsClient.stopExecution(stopRequest); + + logger.info("Stopped execution: " + executionARN); + return; + } + } + + logger.info("No active execution arn exists for queue_type:" + queueType + ", queue_name:" + queueName); + + } catch (Exception e) { + logger.error("Failure in stopping active execution: {}", e.getMessage(), e); + throw e; + } + } + + + /** + * Gets execution ARN of an active/running step-function associated with (queueType, queueName). + * + * @return String : if any active execution exists, else NULL. + */ + public List getAllActiveExecutionArns() { + + try { + String stateMachineArn = getStateMachineARN(); + + ListExecutionsRequest listExecutionRequest = new ListExecutionsRequest().withStateMachineArn(stateMachineArn) + .withStatusFilter(ExecutionStatus.RUNNING); + + ListExecutionsResult listExecutionResults = stepFunctionsClient.listExecutions(listExecutionRequest); + return listExecutionResults.getExecutions(); + + } catch (Exception e) { + logger.error("Unexpected error: {" + e.getMessage() + "} occurred while fetching all active execution ARNs", e); + throw e; + } + + } + + + /** + * Gets stateMachine ARN of a step-function from aws parameter-store. + * + * @return String: stateMachineArn + * + * @throws AWSStepFunctionsException: If some glitch happens at aws end + * @throws MissingResourceException: If state machine arn is not found/set in aws parameter store + */ + public String getStateMachineARN() { + + try { + // TODO_SHAN: Extend this fetch part later based on queueType : queue/dedup/databus + String stateMachineArn = _parameterStoreUtil.getParameter("/" + universe + "/emodb/stepfn/stateMachineArn"); + + if(stateMachineArn != null && !stateMachineArn.isEmpty()) { + return stateMachineArn; + } + } catch (Exception e) { + throw new AWSStepFunctionsException("Problem fetching state machine arn"); + } + + throw new MissingResourceException("state machine arn not found in param-store", "", ""); + } + + + private void syncFreshAttributesFromExistingExecution(QueueExecutionAttributes newQueueExecutionAttributes, QueueExecutionAttributes existingExecutionAttributes) { + + validateExecutionInputs(existingExecutionAttributes.getQueueType(), existingExecutionAttributes.getQueueName(), existingExecutionAttributes); + + if(newQueueExecutionAttributes == null) { + newQueueExecutionAttributes = new QueueExecutionAttributes(); + } + + if(newQueueExecutionAttributes.getQueueType() == null || newQueueExecutionAttributes.getQueueType().isEmpty()) { + newQueueExecutionAttributes.setQueueType(existingExecutionAttributes.getQueueType()); + } + + if(newQueueExecutionAttributes.getQueueName() == null || newQueueExecutionAttributes.getQueueName().isEmpty()) { + newQueueExecutionAttributes.setQueueName(existingExecutionAttributes.getQueueName()); + } + + if(newQueueExecutionAttributes.getQueueThreshold() == null) { + newQueueExecutionAttributes.setQueueThreshold(existingExecutionAttributes.getQueueThreshold()); + } + + if(newQueueExecutionAttributes.getBatchSize() == null) { + newQueueExecutionAttributes.setBatchSize(existingExecutionAttributes.getBatchSize()); + } + + if(newQueueExecutionAttributes.getInterval() == null) { + newQueueExecutionAttributes.setInterval(existingExecutionAttributes.getInterval()); + } + + if(newQueueExecutionAttributes.getTopicName() == null || newQueueExecutionAttributes.getTopicName().isEmpty()) { + newQueueExecutionAttributes.setTopicName(existingExecutionAttributes.getTopicName()); + } + + if(newQueueExecutionAttributes.getStatus() == null || newQueueExecutionAttributes.getStatus().isEmpty()) { + newQueueExecutionAttributes.setStatus(existingExecutionAttributes.getStatus()); + } + + validateExecutionInputs(newQueueExecutionAttributes.getQueueType(), newQueueExecutionAttributes.getQueueName(), newQueueExecutionAttributes); + + } + + private void validateExecutionInputs(String queueType, String queueName, QueueExecutionAttributes executionAttributes) { + if(queueName == null || queueName.isEmpty()) { + throw new IllegalArgumentException("queue name can't be null/empty"); + } + + if(queueType == null || queueType.isEmpty()) { + throw new IllegalArgumentException("queue type can't be null/empty"); + } + + if(executionAttributes == null) { + throw new IllegalArgumentException("execution attributes can't be null"); + } + + if(executionAttributes.getTopicName() == null || executionAttributes.getTopicName().isEmpty()) { + throw new IllegalArgumentException("topic name can't be null/empty"); + } + + if(executionAttributes.getInterval() == null) { + throw new IllegalArgumentException("interval can't be null"); + } + + if(executionAttributes.getBatchSize() == null) { + throw new IllegalArgumentException("batch size can't be null"); + } + + if(executionAttributes.getQueueThreshold() == null) { + throw new IllegalArgumentException("queue threshold can't be null"); + } + } } \ No newline at end of file diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java index 9263d33ef4..98b4d5870d 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java @@ -13,6 +13,9 @@ import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.DedupQueueService; import com.bazaarvoice.emodb.queue.api.QueueService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.ostrich.HostDiscovery; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableMap; diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java new file mode 100644 index 0000000000..136f8d5db4 --- /dev/null +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java @@ -0,0 +1,199 @@ +package com.bazaarvoice.emodb.queue.core.stepfn; + +import com.amazonaws.services.stepfunctions.AWSStepFunctions; +import com.amazonaws.services.stepfunctions.model.StartExecutionRequest; +import com.amazonaws.services.stepfunctions.model.StartExecutionResult; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.lang.reflect.Field; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +public class StepFunctionServiceTest { + + private StepFunctionService stepFunctionService; + + @Mock + private AWSStepFunctions mockStepFunctionsClient; + + @BeforeMethod + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + stepFunctionService = new StepFunctionService(); + + // Use reflection to set the private field stepFunctionsClient + Field field = StepFunctionService.class.getDeclaredField("stepFunctionsClient"); + field.setAccessible(true); // Make the private field accessible + field.set(stepFunctionService, mockStepFunctionsClient); // Inject mock + } + + @Test + public void testStartExecution_withValidParameters() { + // Arrange + String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine"; + String inputPayload = "{\"key\":\"value\"}"; + String executionName = "testExecution"; + + StartExecutionResult mockResult = new StartExecutionResult() + .withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution"); + when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult); + + // Act + stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); + + // Assert + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class); + verify(mockStepFunctionsClient).startExecution(requestCaptor.capture()); + + StartExecutionRequest request = requestCaptor.getValue(); + assertEquals(request.getStateMachineArn(), stateMachineArn); + assertEquals(request.getInput(), inputPayload); + //assertTrue(request.getName().startsWith("testExecution_")); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty") + public void testStartExecution_withNullStateMachineArn() { + // Arrange + String stateMachineArn = null; + String inputPayload = "{\"key\":\"value\"}"; + String executionName = "testExecution"; + + // Act + stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty") + public void testStartExecution_withEmptyStateMachineArn() { + // Arrange + String stateMachineArn = ""; + String inputPayload = "{\"key\":\"value\"}"; + String executionName = "testExecution"; + + // Act + stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); + } + + @Test + public void testStartExecution_withNullInputPayload() { + // Arrange + String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine"; + String executionName = "testExecution"; + + StartExecutionResult mockResult = new StartExecutionResult() + .withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution"); + when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult); + + // Act + stepFunctionService.startExecution(stateMachineArn, null, executionName); + + // Assert + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class); + verify(mockStepFunctionsClient).startExecution(requestCaptor.capture()); + + StartExecutionRequest request = requestCaptor.getValue(); + assertEquals(request.getStateMachineArn(), stateMachineArn); + assertEquals(request.getInput(), "{}"); // Default to empty payload + } + + @Test + public void testSanitizeExecutionName_withInvalidCharacters() { + // Arrange + String invalidExecutionName = "test/execution:name*with?invalid|characters"; + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(invalidExecutionName); + + // Assert + assertEquals(sanitized, "test_execution_name_with_invalid_characters"); + } + + @Test + public void testSanitizeExecutionName_withTooLongName() { + // Arrange + String longExecutionName = "ThisIsAVeryLongExecutionNameThatExceedsTheMaximumAllowedLengthOfSixtyNineCharactersAndShouldBeTruncatedAtSomePoint"; + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(longExecutionName); + + // Assert + assertTrue(sanitized.length() <= 69); + } + + // New Test Cases for Edge Cases + + @Test + public void testSanitizeExecutionName_withValidName() { + // Arrange + String validExecutionName = "validExecutionName"; + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(validExecutionName); + + // Print the output + System.out.println("Sanitized Execution Name: " + sanitized); + + // Assert + assertEquals(sanitized, validExecutionName); // Should return the same name + } + + @Test + public void testSanitizeExecutionName_withLeadingAndTrailingSpaces() { + // Arrange + String executionName = " executionName "; + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(executionName); + + // Print the output + System.out.println("Sanitized Execution Name: " + sanitized); + + // Assert + assertEquals(sanitized, "executionName"); // Should trim spaces + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Execution name cannot contain only invalid characters") + public void testSanitizeExecutionName_withOnlyInvalidCharacters() { + // Arrange + String invalidOnly = "*/?|<>"; // Input with only invalid characters + + stepFunctionService.sanitizeExecutionName(invalidOnly); + } + + + @Test + public void testSanitizeExecutionName_withMaximumLength() { + // Arrange + String maxLengthExecutionName = "ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEDHDFHDFHHFCN"; // 69 characters + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(maxLengthExecutionName); + + // Print the output + System.out.println("Sanitized Execution Name: " + sanitized); + + // Assert + assertEquals(sanitized.length(), 66); // Should be exactly 66 characters + } + + @Test + public void testSanitizeExecutionName_withMultipleInvalidCharacters() { + // Arrange + String executionName = "test//?invalid//name?with*multiple|invalid:characters"; + + // Act + String sanitized = stepFunctionService.sanitizeExecutionName(executionName); + + // Print the output + System.out.println("Sanitized Execution Name: " + sanitized); + + // Assert + assertEquals(sanitized, "test___invalid__name_with_multiple_invalid_characters"); // Should replace all invalid characters + } +} diff --git a/sdk/pom.xml b/sdk/pom.xml index c38a76529d..85d9724b88 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/sor-api/pom.xml b/sor-api/pom.xml index 92a51e19b4..eb1dd7b070 100644 --- a/sor-api/pom.xml +++ b/sor-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java index 7a00eedf9f..711c615250 100644 --- a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java +++ b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java @@ -261,4 +261,10 @@ void dropFacade(String table, String placement, Audit audit) */ URI getStashRoot() throws StashNotAvailableException; + + default void updateRefInDatabus(Iterable updateRefs, Set tags, boolean isFacade) { + /* + * This method is a no-op in the default implementation. It is used by the Databus to update the reference + */ + } } diff --git a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java index 34d020b1fe..9fb41535a9 100644 --- a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java +++ b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java @@ -19,4 +19,6 @@ public static boolean isLegalTableAttributeName(String attributeName) { // The attributes should not start with "~" which is reserved for Emodb's internal use return !attributeName.startsWith("~"); } + + } diff --git a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/UpdateRefModel.java b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/UpdateRefModel.java new file mode 100644 index 0000000000..168132b98a --- /dev/null +++ b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/UpdateRefModel.java @@ -0,0 +1,63 @@ +package com.bazaarvoice.emodb.sor.api; + +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static java.util.Objects.hash; +import static java.util.Objects.requireNonNull; + +/** + * Reference to a System of Record update. + */ +public final class UpdateRefModel { + public static final String TAGS_NAME = "~tags"; + private final String _table; + private final String _key; + private final UUID _changeId; + private final Set _tags; + + public UpdateRefModel(String table, String key, UUID changeId, Set tags) { + _table = requireNonNull(table, "table"); + _key = requireNonNull(key, "key"); + _changeId = requireNonNull(changeId, "changeId"); + _tags = requireNonNull(tags, "tags"); + } + + public String getTable() { + return _table; + } + + public String getKey() { + return _key; + } + + public UUID getChangeId() { + return _changeId; + } + + public Set getTags() { + return _tags; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof UpdateRefModel)) { + return false; + } + UpdateRefModel that = (UpdateRefModel) o; + return Objects.equals(_table, that._table) && + Objects.equals(_key, that._key) && + Objects.equals(_changeId, that._changeId) && + Objects.equals(_tags, that._tags); + } + + @Override + public int hashCode() { + return hash(_table, _key, _changeId, _tags); + } + +} \ No newline at end of file diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml index eeda20a7da..b98b56184f 100644 --- a/sor-client-common/pom.xml +++ b/sor-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml index b596c8954e..22c8159dc1 100644 --- a/sor-client-jersey2/pom.xml +++ b/sor-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/sor-client/pom.xml b/sor-client/pom.xml index 0b1a8c727d..3bf7ae1007 100644 --- a/sor-client/pom.xml +++ b/sor-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/sor/pom.xml b/sor/pom.xml index 4adf86fdc3..eac36180fe 100644 --- a/sor/pom.xml +++ b/sor/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml @@ -348,5 +348,13 @@ testng test + + com.amazonaws + aws-java-sdk-ssm + + + org.apache.kafka + kafka-clients + diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java index fda127a190..6c3958b265 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java @@ -60,6 +60,7 @@ import com.bazaarvoice.emodb.sor.db.cql.CqlForMultiGets; import com.bazaarvoice.emodb.sor.db.cql.CqlForScans; import com.bazaarvoice.emodb.sor.db.cql.SorCqlSettingsTask; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.LogbackSlowQueryLogProvider; import com.bazaarvoice.emodb.sor.log.SlowQueryLog; import com.bazaarvoice.emodb.sor.log.SlowQueryLogConfiguration; @@ -195,6 +196,7 @@ protected void configure() { bind(SlowQueryLog.class).toProvider(LogbackSlowQueryLogProvider.class); bind(HintsConsistencyTimeProvider.class).asEagerSingleton(); bind(MinLagConsistencyTimeProvider.class).asEagerSingleton(); + bind(KafkaProducerService.class).asEagerSingleton(); // The web servers are responsible for updating the ZooKeeper full consistency data. CLI tools don't need to. // Enable updating the ZooKeeper full consistency data if specified diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java index dadbdd3b4e..7e82c275e2 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java @@ -5,29 +5,7 @@ import com.bazaarvoice.emodb.common.json.deferred.LazyJsonMap; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; import com.bazaarvoice.emodb.common.zookeeper.store.MapStore; -import com.bazaarvoice.emodb.sor.api.Audit; -import com.bazaarvoice.emodb.sor.api.AuditBuilder; -import com.bazaarvoice.emodb.sor.api.AuditsUnavailableException; -import com.bazaarvoice.emodb.sor.api.Change; -import com.bazaarvoice.emodb.sor.api.CompactionControlSource; -import com.bazaarvoice.emodb.sor.api.Coordinate; -import com.bazaarvoice.emodb.sor.api.DataStore; -import com.bazaarvoice.emodb.sor.api.DefaultTable; -import com.bazaarvoice.emodb.sor.api.FacadeOptions; -import com.bazaarvoice.emodb.sor.api.History; -import com.bazaarvoice.emodb.sor.api.Intrinsic; -import com.bazaarvoice.emodb.sor.api.Names; -import com.bazaarvoice.emodb.sor.api.ReadConsistency; -import com.bazaarvoice.emodb.sor.api.StashNotAvailableException; -import com.bazaarvoice.emodb.sor.api.StashRunTimeInfo; -import com.bazaarvoice.emodb.sor.api.StashTimeKey; -import com.bazaarvoice.emodb.sor.api.TableOptions; -import com.bazaarvoice.emodb.sor.api.UnknownPlacementException; -import com.bazaarvoice.emodb.sor.api.UnknownTableException; -import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEvent; -import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEventType; -import com.bazaarvoice.emodb.sor.api.Update; -import com.bazaarvoice.emodb.sor.api.WriteConsistency; +import com.bazaarvoice.emodb.sor.api.*; import com.bazaarvoice.emodb.sor.audit.AuditWriter; import com.bazaarvoice.emodb.sor.compactioncontrol.LocalCompactionControl; import com.bazaarvoice.emodb.sor.condition.Condition; @@ -42,7 +20,10 @@ import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.db.ScanRangeSplits; import com.bazaarvoice.emodb.sor.delta.Delta; +import com.bazaarvoice.emodb.sor.kafka.KafkaConfig; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.SlowQueryLog; +import com.bazaarvoice.emodb.sor.ssm.ParameterStoreUtil; import com.bazaarvoice.emodb.table.db.DroppedTableException; import com.bazaarvoice.emodb.table.db.StashBlackListTableCondition; import com.bazaarvoice.emodb.table.db.StashTableDAO; @@ -59,6 +40,8 @@ import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; @@ -79,15 +62,7 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -104,6 +79,8 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab private static final int NUM_COMPACTION_THREADS = 2; private static final int MAX_COMPACTION_QUEUE_LENGTH = 100; + private static final String MASTER_FANOUT_TOPIC = "system_bus_master"; + private static final String DATA_THROTTLER = "databusThrottler"; private final Logger _log = LoggerFactory.getLogger(DefaultDataStore.class); @@ -126,6 +103,11 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab private final CompactionControlSource _compactionControlSource; private final MapStore _minSplitSizeMap; private final Clock _clock; + private final KafkaProducerService _kafkaProducerService; + private ParameterStoreUtil parameterStoreUtil; + private final Cache dataThrottlerCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); private StashTableDAO _stashTableDao; @@ -134,10 +116,10 @@ public DefaultDataStore(LifeCycleRegistry lifeCycle, MetricRegistry metricRegist DataReaderDAO dataReaderDao, DataWriterDAO dataWriterDao, SlowQueryLog slowQueryLog, HistoryStore historyStore, @StashRoot Optional stashRootDirectory, @LocalCompactionControl CompactionControlSource compactionControlSource, @StashBlackListTableCondition Condition stashBlackListTableCondition, AuditWriter auditWriter, - @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock) { + @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService) { this(eventWriterRegistry, tableDao, dataReaderDao, dataWriterDao, slowQueryLog, defaultCompactionExecutor(lifeCycle), historyStore, stashRootDirectory, compactionControlSource, stashBlackListTableCondition, auditWriter, - minSplitSizeMap, metricRegistry, clock); + minSplitSizeMap, metricRegistry, clock, kafkaProducerService); } @VisibleForTesting @@ -146,7 +128,7 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO SlowQueryLog slowQueryLog, ExecutorService compactionExecutor, HistoryStore historyStore, Optional stashRootDirectory, CompactionControlSource compactionControlSource, Condition stashBlackListTableCondition, AuditWriter auditWriter, - MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock) { + MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService) { _eventWriterRegistry = requireNonNull(eventWriterRegistry, "eventWriterRegistry"); _tableDao = requireNonNull(tableDao, "tableDao"); _dataReaderDao = requireNonNull(dataReaderDao, "dataReaderDao"); @@ -166,6 +148,8 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO _compactionControlSource = requireNonNull(compactionControlSource, "compactionControlSource"); _minSplitSizeMap = requireNonNull(minSplitSizeMap, "minSplitSizeMap"); _clock = requireNonNull(clock, "clock"); + _kafkaProducerService = requireNonNull(kafkaProducerService, "kafkaProducerService"); + this.parameterStoreUtil = new ParameterStoreUtil(); } /** @@ -376,6 +360,35 @@ public AnnotatedContent apply(Record record) { }; } + /** + * Retrieves the value of the "DataThrottler" flag from the cache if available. + * If the value is not present in the cache or the cache has expired, it fetches the value + * from AWS Parameter Store and stores it in the cache. + *

+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed + * from the Parameter Store on the next access. + *

+ * + * @return {@code true} if the experiment is still running, otherwise {@code false}. + * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store. + */ + private boolean getDataThrottlerValue() { + try { + String UNIVERSE = KafkaConfig.getUniverseFromEnv(); + // Attempt to retrieve from cache + return dataThrottlerCache.get(DATA_THROTTLER, () -> { + + Boolean checkDataThrottler = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/" + DATA_THROTTLER)); + _log.info("DATA_THROTTLER is refreshed {}", checkDataThrottler); + // If absent or expired, fetch from Parameter Store and cache the result + return checkDataThrottler; + }); + } catch (Exception e) { + _log.error("Error fetching databusThrottler valie{}", e.getMessage()); + return false; + } + } + /** * Resolve a set of changes read from the {@link DataWriterDAO} into a single JSON literal object + metadata. * If the record can be compacted an asynchronous compaction will be scheduled unless @@ -744,7 +757,10 @@ public void beforeWrite(Collection updateBatch) { } } if (!updateRefs.isEmpty()) { - _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs); + if(getDataThrottlerValue()) + _kafkaProducerService.sendMessages(MASTER_FANOUT_TOPIC, updateRefs, "update"); + else + _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs); } } @@ -1025,4 +1041,30 @@ private void decrementDeltaSizes(PendingCompaction pendingCompaction) { private String getMetricName(String name) { return MetricRegistry.name("bv.emodb.sor", "DefaultDataStore", name); } + + private Iterable convertToUpdateRef(Iterable apiUpdateRefs) { + List coreUpdateRefs = new ArrayList<>(); + for (UpdateRefModel apiUpdateRefModel : apiUpdateRefs) { + String tableName = apiUpdateRefModel.getTable(); + String key = apiUpdateRefModel.getKey(); + UUID changeId = apiUpdateRefModel.getChangeId(); + Set tags = apiUpdateRefModel.getTags(); + coreUpdateRefs.add(new com.bazaarvoice.emodb.sor.core.UpdateRef(tableName, key, changeId, tags)); + } + return coreUpdateRefs; + } + + @Override + public void updateRefInDatabus(Iterable updateRefs, Set tags, boolean isFacade) { + Iterator updateRefsIter = convertToUpdateRef(updateRefs).iterator(); + if (!updateRefsIter.hasNext()) { + return; + } + List updateRefList = new ArrayList<>(); + while (updateRefsIter.hasNext()) { + UpdateRef updateRef = updateRefsIter.next(); + updateRefList.add(updateRef); + } + _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefList); + } } diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java index 37348e976b..8f6d9da161 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java @@ -6,6 +6,7 @@ import com.bazaarvoice.emodb.sor.core.DatabusEventWriterRegistry; import com.bazaarvoice.emodb.sor.core.DefaultDataStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; import com.codahale.metrics.MetricRegistry; @@ -19,18 +20,19 @@ */ public class InMemoryDataStore extends DefaultDataStore { - public InMemoryDataStore(MetricRegistry metricRegistry) { - this(new InMemoryDataReaderDAO(), metricRegistry); + public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { + this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService); } - public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) { - this(new DatabusEventWriterRegistry(), dataDao, metricRegistry); + + public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { + this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService); } - public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) { + public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { super(eventWriterRegistry, new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService); } } diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java new file mode 100644 index 0000000000..a6518246b2 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java @@ -0,0 +1,148 @@ +package com.bazaarvoice.emodb.sor.kafka; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.Parameter; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +public class KafkaConfig { + private static String bootstrapServersConfig; + private static String batchSizeConfig; + private static String retriesConfig; + private static String lingerMsConfig; + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + // Static SSM Client and configuration using AWS SDK v1 + private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder + .standard() + .build(); + + + static { + try { + final String UNIVERSE = getUniverseFromEnv(); + // Load configurations from SSM during static initialization + Map parameterValues = getParameterValues( + Arrays.asList( + "/" + UNIVERSE + "/emodb/kafka/batchSize", + "/" + UNIVERSE + "/emodb/kafka/retries", + "/" + UNIVERSE + "/emodb/kafka/lingerMs", + "/" + UNIVERSE + "/emodb/kafka/bootstrapServers" + ) + ); + + // Set configurations with fallback to defaults if not present + // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending. + batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384"); + + // Sets the number of retry attempts for failed Kafka message sends. + retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3"); + + // Sets the number of milliseconds a producer is willing to wait before sending a batch out + lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1"); + + // Configures the Kafka broker addresses for producer connections. + bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", "localhost:9092"); + + logger.info("Kafka configurations loaded successfully from SSM."); + } catch (AmazonServiceException e) { + logger.error("Failed to load configurations from SSM. Using default values.", e); + throw e; + } + catch (Exception e) { + logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e); + throw e; + } + } + + public static String getUniverseFromEnv() { + String filePath = "/etc/environment"; + logger.info("Reading environment file: " + filePath); + Properties environmentProps = new Properties(); + + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = reader.readLine()) != null) { + // Skip empty lines or comments + if (line.trim().isEmpty() || line.trim().startsWith("#")) { + continue; + } + // Split the line into key-value pair + String[] parts = line.split("=", 2); + logger.info("parts: " + Arrays.toString(parts)); + if (parts.length == 2) { + String key = parts[0].trim(); + String value = parts[1].trim(); + // Remove any surrounding quotes from value + value = value.replace("\"", ""); + environmentProps.put(key, value); + } + } + // Access the environment variables + return environmentProps.getProperty("UNIVERSE"); + } catch (IOException e) { + logger.error("Error reading environment file: " + e.getMessage()); + throw new RuntimeException("Error reading environment file: " + e.getMessage()); + } + } + // Fetch parameters from AWS SSM using AWS SDK v1 + private static Map getParameterValues(List parameterNames) { + try { + GetParametersRequest request = new GetParametersRequest() + .withNames(parameterNames) + .withWithDecryption(true); + + GetParametersResult response = ssmClient.getParameters(request); + + return response.getParameters().stream() + .collect(Collectors.toMap(Parameter::getName, Parameter::getValue)); + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from SSM.", e); + throw e; // Rethrow or handle the exception if necessary + } + } + + // Kafka Producer properties + public static Properties getProducerProps() { + Properties producerProps = new Properties(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig)); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig)); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig)); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting + logger.info("Kafka Producer properties initialized."); + return producerProps; + } + + // Ensure the SSM client is closed when the application shuts down + public static void shutdown() { + if (ssmClient != null) { + try { + ssmClient.shutdown(); + logger.info("SSM client closed successfully."); + } catch (Exception e) { + logger.error("Error while closing SSM client.", e); + } + } + } +} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java new file mode 100644 index 0000000000..4cb587cf29 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java @@ -0,0 +1,66 @@ +package com.bazaarvoice.emodb.sor.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Future; + +public class KafkaProducerService { + private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class); + private final KafkaProducer producer; // Changed to String + + public KafkaProducerService() { + this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps()); + _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps()); + } + + /** + * Sends each message from the collection to the specified Kafka topic separately. + * + * @param topic The Kafka topic. + * @param events The collection of messages to be sent. + */ + public void sendMessages(String topic, Collection events, String queueType) { + LocalDateTime startTime = LocalDateTime.now(); + _log.info("Sending {} messages to topic '{}'", events.size(), topic); + List> futures = new ArrayList<>(); + // Use async sendMessage and collect futures + for (T event : events) { + futures.add(producer.send(new ProducerRecord<>(topic, event.toString()))); + } + + // Wait for all futures to complete + for (Future future : futures) { + try { + future.get(); // Only blocks if a future is not yet complete + } catch (Exception e) { + _log.error("Error while sending message to Kafka: {}", e.getMessage()); + throw new RuntimeException("Error sending messages to Kafka", e); + } + } + _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis()); + } + + + /** + * Closes the producer to release resources. + */ + public void close() { + _log.info("Closing Kafka producer."); + try { + producer.flush(); + producer.close(); + } catch (Exception e) { + _log.error("Error while closing Kafka producer: ", e); + throw e; + } + } +} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java new file mode 100644 index 0000000000..693901b264 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java @@ -0,0 +1,120 @@ +package com.bazaarvoice.emodb.sor.ssm; + +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class for interacting with AWS Parameter Store using AWS SDK v1. + */ +public class ParameterStoreUtil { + + private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class); + private final AWSSimpleSystemsManagement ssmClient; + + /** + * Constructor to initialize the SSM client + */ + public ParameterStoreUtil() { + // Create SSM client with default credentials and region + ssmClient = AWSSimpleSystemsManagementClientBuilder.standard() + .build(); + } + + /** + * Fetches a parameter from AWS Parameter Store. + * + * @param parameterName The name of the parameter to fetch + * @return The value of the parameter + * @throws IllegalArgumentException If the parameterName is null or empty + */ + public String getParameter(String parameterName) { + if (parameterName == null || parameterName.isEmpty()) { + logger.error("Parameter name cannot be null or empty"); + throw new IllegalArgumentException("Parameter name cannot be null or empty"); + } + + try { + + GetParameterRequest request = new GetParameterRequest().withName(parameterName); + GetParameterResult result = ssmClient.getParameter(request); + return result.getParameter().getValue(); + + } catch (ParameterNotFoundException e) { + logger.error("Parameter not found: {}", parameterName, e); + throw new RuntimeException("Parameter not found: " + parameterName, e); + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameter: {}", parameterName, e); + throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e); + } + } + + /** + * Fetches multiple parameters from AWS Parameter Store in a batch. + * + * @param parameterNames The list of parameter names to fetch + * @return A map of parameter names to their values + * @throws IllegalArgumentException If the parameterNames list is null or empty + */ + public Map getParameters(List parameterNames) { + if (parameterNames == null || parameterNames.isEmpty()) { + logger.error("Parameter names list cannot be null or empty"); + throw new IllegalArgumentException("Parameter names list cannot be null or empty"); + } + + try { + + GetParametersRequest request = new GetParametersRequest().withNames(parameterNames); + GetParametersResult result = ssmClient.getParameters(request); + + // Map the result to a Map of parameter names and values + Map parameters = new HashMap<>(); + result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue())); + + // Log any parameters that were not found + if (!result.getInvalidParameters().isEmpty()) { + logger.warn("The following parameters were not found: {}", result.getInvalidParameters()); + } + + return parameters; + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameters: {}", parameterNames, e); + throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e); + } + } + + public Long updateParameter(String key, String value) { + try { + if (key == null || key.trim().isEmpty()) { + logger.error("parameter name cannot be null or blank"); + throw new IllegalArgumentException("parameter name cannot be null or blank"); + } + + PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true); + + PutParameterResult response = ssmClient.putParameter(request); + logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion()); + return response.getVersion(); + } catch (Exception e) { + logger.error("Failed to update parameter: " + key + " with value: " + value, e); + throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e); + } + } + +} diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java index 48da779c69..442de20665 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java @@ -16,6 +16,7 @@ import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.bazaarvoice.emodb.table.db.Table; @@ -485,7 +486,7 @@ public void compact(Table table, String key, UUID compactionKey, Compaction comp } }; - final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry()); + final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class)); // Create a table for our test dataStore.createTable(tableName, @@ -571,7 +572,7 @@ public Record read(Key key, ReadConsistency ignored) { // Configure the data DAO to read 10 columns initially, causing other column reads to be read lazily dataDAO.setColumnBatchSize(10); - final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry()); + final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class)); // Create a table for our test dataStore.createTable(tableName, diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java index 3cf9b7b50f..bb66c772fd 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java @@ -14,6 +14,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.codahale.metrics.MetricRegistry; @@ -33,6 +34,7 @@ import java.util.Set; import java.util.UUID; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -47,7 +49,7 @@ public class DataStoreTest { @Test public void testDeltas() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry()); + DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); @@ -167,7 +169,7 @@ public void testDeltas() throws Exception { @Test public void testRecordTimestamps() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry()); + DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); @@ -262,7 +264,7 @@ record = store.get(TABLE, KEY1); @Test public void testRecordTimestampsWithEventTags() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry()); + DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java index ac585fa220..6030144919 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java @@ -10,6 +10,7 @@ import com.bazaarvoice.emodb.sor.db.astyanax.ChangeEncoder; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.PlacementCache; @@ -43,7 +44,7 @@ public List getSplits(Table table, int recordsPerSplit, int localResplit } }; - DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry()); + DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry(), mock(KafkaProducerService.class)); dataStore.createTable("table", new TableOptionsBuilder().setPlacement("default").build(), Collections.emptyMap(), new AuditBuilder().build()); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java index 7377838dc5..36bdcdf8e4 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java @@ -24,6 +24,7 @@ import com.bazaarvoice.emodb.sor.db.test.DeltaClusteringKey; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; @@ -65,7 +66,7 @@ public void testRedundantDeltas() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -122,7 +123,7 @@ public void testMinUUIDDelta() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -159,7 +160,7 @@ public void testRedundancyWithTags() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -240,7 +241,7 @@ public void testTagsForNestedMapDeltas() { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -260,7 +261,7 @@ public void testRedundancyWithCompactionAndUnchangedTag() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -337,7 +338,7 @@ public void testPartialCompactionWithNoRedundancy() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -409,7 +410,7 @@ public void testPartialCompactionWithRedundancy() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class)); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java index 60574f680c..047c373f72 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java @@ -8,6 +8,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableMap; @@ -19,6 +20,7 @@ import java.util.UUID; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -34,7 +36,7 @@ public class SorUpdateTest { public void SetupTest() { final InMemoryDataReaderDAO dataDAO = new InMemoryDataReaderDAO(); _eventWriterRegistry = new DatabusEventWriterRegistry(); - _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry()); + _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry(), mock(KafkaProducerService.class)); // Create a table for our test diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java index c67f985342..66e4d5fe2b 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java @@ -11,6 +11,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryHistoryStore; import com.bazaarvoice.emodb.sor.core.test.InMemoryMapStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.TableDAO; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; @@ -20,6 +21,8 @@ import java.time.Clock; import java.util.Optional; +import static org.mockito.Mockito.mock; + /** * Wrapper around a set of {@link DataStore} instances that replicate to each other, * simulating a set of eventually consistent data centers. @@ -63,12 +66,12 @@ public MultiDCDataStores(int numDCs, boolean asyncCompacter, MetricRegistry metr if (asyncCompacter) { _stores[i] = new DefaultDataStore(new SimpleLifeCycleRegistry(), metricRegistry, new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), _historyStores[i], - Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC()); + Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC(), mock(KafkaProducerService.class)); } else { _stores[i] = new DefaultDataStore(new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), _historyStores[i], Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC()); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), mock(KafkaProducerService.class)); } } } diff --git a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java index 5ad5ff357f..f86ba00818 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java @@ -27,6 +27,7 @@ import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; import com.bazaarvoice.emodb.sor.delta.MapDeltaBuilder; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.MoveType; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.TableBackingStore; @@ -1981,7 +1982,7 @@ dataCenter, mock(RateLimiterCache.class), dataCopyDAO, dataPurgeDAO, } private InMemoryDataStore newBackingStore(MetricRegistry metricRegistry) { - InMemoryDataStore store = new InMemoryDataStore(metricRegistry); + InMemoryDataStore store = new InMemoryDataStore(metricRegistry, mock(KafkaProducerService.class)); store.createTable("__system:table", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); store.createTable("__system:table_uuid", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); store.createTable("__system:table_unpublished_databus_events", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); diff --git a/table/pom.xml b/table/pom.xml index c0c8d86ce2..241962d945 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/uac-api/pom.xml b/uac-api/pom.xml index 1298413f2b..399e45aef5 100644 --- a/uac-api/pom.xml +++ b/uac-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml index 392bbb0d28..67fd9c73b2 100644 --- a/uac-client-jersey2/pom.xml +++ b/uac-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/uac-client/pom.xml b/uac-client/pom.xml index e583ba615c..45cbb28f45 100644 --- a/uac-client/pom.xml +++ b/uac-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/web-local/pom.xml b/web-local/pom.xml index 33013ccfd8..f2e21d8d68 100644 --- a/web-local/pom.xml +++ b/web-local/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/web/pom.xml b/web/pom.xml index 8bc7cd02ef..a3e65691c2 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java index 8e9f9b2e21..7c73fd693d 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java @@ -61,6 +61,11 @@ public long getEventCountUpTo(Subject subject, @PartitionKey String subscription return databus(subject).getEventCountUpTo(subscription, limit); } + @Override + public long getMasterCount(Subject subject){ + return databus(subject).getMasterCount(); + } + @Override public long getClaimCount(Subject subject, @PartitionKey String subscription) { return databus(subject).getClaimCount(subscription); diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java index ca356d0ac9..91e23b90e9 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java @@ -173,6 +173,18 @@ public long getEventCount(@QueryParam ("partitioned") BooleanParam partitioned, } } + @GET + @Path ("{channel}/uncached_size") + @Timed (name = "bv.emodb.databus.DatabusResource1.getMasterEventCount", absolute = true) + @ApiOperation (value = "Gets the master event count.", + notes = "Returns a long.", + response = long.class + ) + public long getEventCountInMaster(@QueryParam ("partitioned") BooleanParam partitioned, + @Authenticated Subject subject) { + return getClient(partitioned).getMasterCount(subject); + } + @GET @Path ("{subscription}/claimcount") @RequiresPermissions ("databus|get_status|{subscription}") diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java index 44493228e9..bcac1c3e8b 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java @@ -52,6 +52,8 @@ Subscription getSubscription(Subject subject, String subscription) long getEventCountUpTo(Subject subject, String subscription, long limit); + long getMasterCount(Subject subject); + long getClaimCount(Subject subject, String subscription); Iterator peek(Subject subject, String subscription, int limit); diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java index e0a312c623..0e94b1100f 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java @@ -6,6 +6,9 @@ import com.bazaarvoice.emodb.queue.api.MoveQueueStatus; import com.bazaarvoice.emodb.queue.api.QueueService; import com.bazaarvoice.emodb.queue.client.QueueServiceAuthenticator; +import com.bazaarvoice.emodb.queue.core.Entities.QueueExecutionAttributes; +import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.emodb.web.auth.Permissions; import com.bazaarvoice.emodb.web.auth.resource.NamedResource; import com.bazaarvoice.emodb.web.jersey.params.SecondsParam; @@ -28,6 +31,7 @@ import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -63,11 +67,17 @@ public class QueueResource1 { private final Meter _sendBatchNull_qr1; + private final ParameterStoreUtil _parameterStoreUtil; + + private final StepFunctionService _stepFunctionService; + public QueueResource1(QueueService queueService, QueueServiceAuthenticator queueClient, MetricRegistry metricRegistry) { //this._metricRegistry = metricRegistry; _queueService = requireNonNull(queueService, "queueService"); _queueClient = requireNonNull(queueClient, "queueClient"); + _parameterStoreUtil = new ParameterStoreUtil(); + _stepFunctionService = new StepFunctionService(); _messageCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "polledMessageCount_qr1")); _nullPollsCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "nullPollsCount_qr1")); _sendCount_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendCount_qr1")); @@ -341,6 +351,36 @@ public SuccessResponse purge(@QueryParam("partitioned") BooleanParam partitioned return SuccessResponse.instance(); } + @PUT + @Path("/UpdateParameterStore") + @Consumes(MediaType.APPLICATION_JSON) + @ApiOperation (value = "update param operation at aws parameter store .", + notes = "Returns a SuccessResponse.", response = SuccessResponse.class) + public SuccessResponse updateParam(Map keyValuePair) { + String key = keyValuePair.keySet().iterator().next(); + String value = keyValuePair.get(key); + + Long update_version = _parameterStoreUtil.updateParameter(key, value); + return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | ssm-parameter updated successfully, update_version: " + update_version)); + } + + @PUT + @Path("/{queue_type}/{queue_name}/QueueExecutionAttributes") + @RequiresPermissions("queue|poll|{queue_name}") + @Consumes(MediaType.APPLICATION_JSON) + @ApiOperation (value = "update queue execution attributes .", notes = "Returns a SuccessResponse.", response = SuccessResponse.class) + public SuccessResponse updateQueueExecutionAttributes(@PathParam("queue_type") String queueType, @PathParam("queue_name") String queueName, QueueExecutionAttributes newExecAttributes) { + newExecAttributes.setQueueName(queueName); + newExecAttributes.setQueueType(queueType); + _stepFunctionService.startSFNWithAttributes(newExecAttributes); + + if("DISABLED".equals(newExecAttributes.getStatus())) { + return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | step function successfully stopped(if any execution existed) as status=DISABLED was provided")); + } else { + return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | step function successfully re-started, or started with updated attributes")); + } + } + private QueueService getService(BooleanParam partitioned, String apiKey) { return partitioned != null && partitioned.get() ? _queueService : _queueClient.usingCredentials(apiKey); } diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java index 27baf89a04..2efd03733e 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java @@ -8,19 +8,7 @@ import com.bazaarvoice.emodb.common.json.OrderedJson; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; import com.bazaarvoice.emodb.datacenter.api.DataCenter; -import com.bazaarvoice.emodb.sor.api.Audit; -import com.bazaarvoice.emodb.sor.api.Change; -import com.bazaarvoice.emodb.sor.api.CompactionControlSource; -import com.bazaarvoice.emodb.sor.api.Coordinate; -import com.bazaarvoice.emodb.sor.api.DataStore; -import com.bazaarvoice.emodb.sor.api.FacadeOptions; -import com.bazaarvoice.emodb.sor.api.Intrinsic; -import com.bazaarvoice.emodb.sor.api.PurgeStatus; -import com.bazaarvoice.emodb.sor.api.Table; -import com.bazaarvoice.emodb.sor.api.TableOptions; -import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEvent; -import com.bazaarvoice.emodb.sor.api.Update; -import com.bazaarvoice.emodb.sor.api.WriteConsistency; +import com.bazaarvoice.emodb.sor.api.*; import com.bazaarvoice.emodb.sor.core.DataStoreAsync; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; @@ -947,6 +935,59 @@ private Response redirectTo(DataCenter dataCenter, URI requestUri) { build(); } + @POST + @Path("{channel}/sendbatch1") + @Consumes(MediaType.APPLICATION_JSON) + @Timed(name = "bv.emodb.sor.DataStoreResource1.updateRef", absolute = true) + @ApiOperation(value = "Updates a reference", + notes = "Updates a reference", + response = SuccessResponse.class + ) + public SuccessResponse updateRefToDatabus(InputStream in, + @QueryParam("consistency") @DefaultValue("STRONG") WriteConsistencyParam consistency, + @QueryParam("tag") List tags, + @Authenticated Subject subject) { + Set tagsSet = (tags == null) ? ImmutableSet.of() : Sets.newHashSet(tags); + Iterable updateRefs = asSubjectSafeUpdateRefModelIterable(new JsonStreamingArrayParser<>(in, UpdateRefModel.class), subject, true); + // Perform the update by writing to Databus + _dataStore.updateRefInDatabus(updateRefs, tagsSet, false); + return SuccessResponse.instance(); + } + + /** + * Takes an update ref stream from a subject and performs the following actions on it: + * 1. Checks that the subject has permission to update the record being updated + * 2. Applies any active rate limiting for updates by the subject + */ + private Iterable asSubjectSafeUpdateRefModelIterable(Iterator updateRefs, final Subject subject, final boolean isFacade) { + return Iterables.filter( + OneTimeIterable.wrap(updateRefs), + new Predicate() { + @Override + public boolean apply(UpdateRefModel updateRefModel) { + NamedResource resource = new NamedResource(updateRefModel.getTable()); + boolean hasPermission; + if (isFacade) { + hasPermission = subject.hasPermission(Permissions.updateFacade(resource)); + } else { + hasPermission = subject.hasPermission(Permissions.updateSorTable(resource)); + } + + if (!hasPermission) { + throw new UnauthorizedException("not authorized to update table " + updateRefModel.getTable()); + } + + // Facades are a unique case used internally for shoveling data across data centers, so don't rate + // limit facade updates. + if (!isFacade) { + _updateThrottle.beforeUpdate(subject.getId()); + } + + return true; + } + }); + } + private UUID parseUuidOrTimestamp(@Nullable String string, boolean rangeUpperEnd) { if (string == null) { return null; diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java index fd11acab47..d0d45f8435 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java @@ -24,6 +24,7 @@ import com.bazaarvoice.emodb.sor.core.PurgeResult; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.resources.sor.AuditParam; import com.bazaarvoice.emodb.web.resources.sor.DataStoreResource1; import com.bazaarvoice.emodb.web.throttling.UnlimitedDataStoreUpdateThrottler; @@ -84,7 +85,7 @@ public void setUp() throws Exception { lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator, 1, Duration.ZERO, 100, Duration.ofHours(1)); - _store = new InMemoryDataStore(new MetricRegistry()); + _store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _dataStoreResource = new DataStoreResource1(_store, new DefaultDataStoreAsync(_store, _service, _jobHandlerRegistry), mock(CompactionControlSource.class), new UnlimitedDataStoreUpdateThrottler()); diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java index a96563df8d..03c4b58787 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java @@ -27,6 +27,7 @@ import com.bazaarvoice.emodb.sor.db.Record; import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.db.ScanRangeSplits; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxStorage; import com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor; @@ -420,8 +421,9 @@ dataTools, scanWriterGenerator, compactionControlSource, mock(LifeCycleRegistry. @Test public void testScanUploadFromExistingScan() throws Exception { MetricRegistry metricRegistry = new MetricRegistry(); + KafkaProducerService kafkaProducerService = mock(KafkaProducerService.class); // Use an in-memory data store but override the default splits operation to return 4 splits for the test placement - InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry)); + InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry, kafkaProducerService)); when(dataStore.getScanRangeSplits("app_global:default", 1000000, Optional.empty())) .thenReturn(new ScanRangeSplits(ImmutableList.of( createSimpleSplitGroup("00", "40"), @@ -621,7 +623,7 @@ public void testScanFailureRecovery() Lists.newArrayList(), Lists.newArrayList()); InMemoryScanWorkflow scanWorkflow = new InMemoryScanWorkflow(); - ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry()), "scan_table", "app_global:sys"); + ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)), "scan_table", "app_global:sys"); LocalScanUploadMonitor monitor = new LocalScanUploadMonitor(scanWorkflow, scanStatusDAO, mock(ScanWriterGenerator.class), mock(StashStateListener.class), mock(ScanCountListener.class), mock(DataTools.class), new InMemoryCompactionControlSource(), mock(DataCenters.class)); diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java index 0c19fa2c14..0fedee1486 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java @@ -6,6 +6,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.scanner.ScanDestination; import com.bazaarvoice.emodb.web.scanner.ScanOptions; import com.codahale.metrics.MetricRegistry; @@ -22,6 +23,7 @@ import java.util.List; import java.util.Optional; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -33,7 +35,7 @@ public class DataStoreScanStatusDAOTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry()); + _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _dao = new DataStoreScanStatusDAO(_dataStore, "scan_table", "app_global:sys"); } diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java index 4e14f50fd1..aab4241cae 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java @@ -2,6 +2,7 @@ import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableSet; import org.testng.annotations.BeforeMethod; @@ -9,6 +10,7 @@ import java.util.Date; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -19,7 +21,7 @@ public class DataStoreStashRequestDAOTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry()); + _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _dao = new DataStoreStashRequestDAO(_dataStore, "request_table", "app_global:sys"); } diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java index f8c5758a07..fd7249564f 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java @@ -6,6 +6,7 @@ import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.Intrinsic; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.inject.util.Providers; @@ -32,7 +33,7 @@ public class SettingsManagerTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry()); + _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)); _cacheRegistry = mock(CacheRegistry.class); _cacheHandle = mock(CacheHandle.class); when(_cacheRegistry.register(eq("settings"), any(Cache.class), eq(true))).thenReturn(_cacheHandle); diff --git a/yum/pom.xml b/yum/pom.xml index b4c6f8e3a5..d82d25e11b 100644 --- a/yum/pom.xml +++ b/yum/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.205-SNAPSHOT + 6.5.209-SNAPSHOT ../parent/pom.xml