minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService) {
_eventWriterRegistry = requireNonNull(eventWriterRegistry, "eventWriterRegistry");
_tableDao = requireNonNull(tableDao, "tableDao");
_dataReaderDao = requireNonNull(dataReaderDao, "dataReaderDao");
@@ -166,6 +148,8 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO
_compactionControlSource = requireNonNull(compactionControlSource, "compactionControlSource");
_minSplitSizeMap = requireNonNull(minSplitSizeMap, "minSplitSizeMap");
_clock = requireNonNull(clock, "clock");
+ _kafkaProducerService = requireNonNull(kafkaProducerService, "kafkaProducerService");
+ this.parameterStoreUtil = new ParameterStoreUtil();
}
/**
@@ -376,6 +360,35 @@ public AnnotatedContent apply(Record record) {
};
}
+ /**
+ * Retrieves the value of the "DataThrottler" flag from the cache if available.
+ * If the value is not present in the cache or the cache has expired, it fetches the value
+ * from AWS Parameter Store and stores it in the cache.
+ *
+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed
+ * from the Parameter Store on the next access.
+ *
+ *
+ * @return {@code true} if the experiment is still running, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store.
+ */
+ private boolean getDataThrottlerValue() {
+ try {
+ String UNIVERSE = KafkaConfig.getUniverseFromEnv();
+ // Attempt to retrieve from cache
+ return dataThrottlerCache.get(DATA_THROTTLER, () -> {
+
+ Boolean checkDataThrottler = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/" + DATA_THROTTLER));
+ _log.info("DATA_THROTTLER is refreshed {}", checkDataThrottler);
+ // If absent or expired, fetch from Parameter Store and cache the result
+ return checkDataThrottler;
+ });
+ } catch (Exception e) {
+ _log.error("Error fetching databusThrottler valie{}", e.getMessage());
+ return false;
+ }
+ }
+
/**
* Resolve a set of changes read from the {@link DataWriterDAO} into a single JSON literal object + metadata.
* If the record can be compacted an asynchronous compaction will be scheduled unless
@@ -744,7 +757,10 @@ public void beforeWrite(Collection updateBatch) {
}
}
if (!updateRefs.isEmpty()) {
- _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs);
+ if(getDataThrottlerValue())
+ _kafkaProducerService.sendMessages(MASTER_FANOUT_TOPIC, updateRefs, "update");
+ else
+ _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs);
}
}
@@ -1025,4 +1041,30 @@ private void decrementDeltaSizes(PendingCompaction pendingCompaction) {
private String getMetricName(String name) {
return MetricRegistry.name("bv.emodb.sor", "DefaultDataStore", name);
}
+
+ private Iterable convertToUpdateRef(Iterable apiUpdateRefs) {
+ List coreUpdateRefs = new ArrayList<>();
+ for (UpdateRefModel apiUpdateRefModel : apiUpdateRefs) {
+ String tableName = apiUpdateRefModel.getTable();
+ String key = apiUpdateRefModel.getKey();
+ UUID changeId = apiUpdateRefModel.getChangeId();
+ Set tags = apiUpdateRefModel.getTags();
+ coreUpdateRefs.add(new com.bazaarvoice.emodb.sor.core.UpdateRef(tableName, key, changeId, tags));
+ }
+ return coreUpdateRefs;
+ }
+
+ @Override
+ public void updateRefInDatabus(Iterable updateRefs, Set tags, boolean isFacade) {
+ Iterator updateRefsIter = convertToUpdateRef(updateRefs).iterator();
+ if (!updateRefsIter.hasNext()) {
+ return;
+ }
+ List updateRefList = new ArrayList<>();
+ while (updateRefsIter.hasNext()) {
+ UpdateRef updateRef = updateRefsIter.next();
+ updateRefList.add(updateRef);
+ }
+ _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefList);
+ }
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
index 37348e976b..8f6d9da161 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
@@ -6,6 +6,7 @@
import com.bazaarvoice.emodb.sor.core.DatabusEventWriterRegistry;
import com.bazaarvoice.emodb.sor.core.DefaultDataStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
import com.codahale.metrics.MetricRegistry;
@@ -19,18 +20,19 @@
*/
public class InMemoryDataStore extends DefaultDataStore {
- public InMemoryDataStore(MetricRegistry metricRegistry) {
- this(new InMemoryDataReaderDAO(), metricRegistry);
+ public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
+ this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService);
}
- public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) {
- this(new DatabusEventWriterRegistry(), dataDao, metricRegistry);
+
+ public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
+ this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService);
}
- public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) {
+ public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
super(eventWriterRegistry, new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService);
}
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java
new file mode 100644
index 0000000000..a6518246b2
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java
@@ -0,0 +1,148 @@
+package com.bazaarvoice.emodb.sor.kafka;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.Parameter;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class KafkaConfig {
+ private static String bootstrapServersConfig;
+ private static String batchSizeConfig;
+ private static String retriesConfig;
+ private static String lingerMsConfig;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+ // Static SSM Client and configuration using AWS SDK v1
+ private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder
+ .standard()
+ .build();
+
+
+ static {
+ try {
+ final String UNIVERSE = getUniverseFromEnv();
+ // Load configurations from SSM during static initialization
+ Map parameterValues = getParameterValues(
+ Arrays.asList(
+ "/" + UNIVERSE + "/emodb/kafka/batchSize",
+ "/" + UNIVERSE + "/emodb/kafka/retries",
+ "/" + UNIVERSE + "/emodb/kafka/lingerMs",
+ "/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
+ )
+ );
+
+ // Set configurations with fallback to defaults if not present
+ // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
+ batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
+
+ // Sets the number of retry attempts for failed Kafka message sends.
+ retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
+
+ // Sets the number of milliseconds a producer is willing to wait before sending a batch out
+ lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
+
+ // Configures the Kafka broker addresses for producer connections.
+ bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", "localhost:9092");
+
+ logger.info("Kafka configurations loaded successfully from SSM.");
+ } catch (AmazonServiceException e) {
+ logger.error("Failed to load configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ catch (Exception e) {
+ logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ }
+
+ public static String getUniverseFromEnv() {
+ String filePath = "/etc/environment";
+ logger.info("Reading environment file: " + filePath);
+ Properties environmentProps = new Properties();
+
+ try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Skip empty lines or comments
+ if (line.trim().isEmpty() || line.trim().startsWith("#")) {
+ continue;
+ }
+ // Split the line into key-value pair
+ String[] parts = line.split("=", 2);
+ logger.info("parts: " + Arrays.toString(parts));
+ if (parts.length == 2) {
+ String key = parts[0].trim();
+ String value = parts[1].trim();
+ // Remove any surrounding quotes from value
+ value = value.replace("\"", "");
+ environmentProps.put(key, value);
+ }
+ }
+ // Access the environment variables
+ return environmentProps.getProperty("UNIVERSE");
+ } catch (IOException e) {
+ logger.error("Error reading environment file: " + e.getMessage());
+ throw new RuntimeException("Error reading environment file: " + e.getMessage());
+ }
+ }
+ // Fetch parameters from AWS SSM using AWS SDK v1
+ private static Map getParameterValues(List parameterNames) {
+ try {
+ GetParametersRequest request = new GetParametersRequest()
+ .withNames(parameterNames)
+ .withWithDecryption(true);
+
+ GetParametersResult response = ssmClient.getParameters(request);
+
+ return response.getParameters().stream()
+ .collect(Collectors.toMap(Parameter::getName, Parameter::getValue));
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from SSM.", e);
+ throw e; // Rethrow or handle the exception if necessary
+ }
+ }
+
+ // Kafka Producer properties
+ public static Properties getProducerProps() {
+ Properties producerProps = new Properties();
+
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig));
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig));
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig));
+ producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting
+ logger.info("Kafka Producer properties initialized.");
+ return producerProps;
+ }
+
+ // Ensure the SSM client is closed when the application shuts down
+ public static void shutdown() {
+ if (ssmClient != null) {
+ try {
+ ssmClient.shutdown();
+ logger.info("SSM client closed successfully.");
+ } catch (Exception e) {
+ logger.error("Error while closing SSM client.", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java
new file mode 100644
index 0000000000..4cb587cf29
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java
@@ -0,0 +1,66 @@
+package com.bazaarvoice.emodb.sor.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public class KafkaProducerService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class);
+ private final KafkaProducer producer; // Changed to String
+
+ public KafkaProducerService() {
+ this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps());
+ _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps());
+ }
+
+ /**
+ * Sends each message from the collection to the specified Kafka topic separately.
+ *
+ * @param topic The Kafka topic.
+ * @param events The collection of messages to be sent.
+ */
+ public void sendMessages(String topic, Collection events, String queueType) {
+ LocalDateTime startTime = LocalDateTime.now();
+ _log.info("Sending {} messages to topic '{}'", events.size(), topic);
+ List> futures = new ArrayList<>();
+ // Use async sendMessage and collect futures
+ for (T event : events) {
+ futures.add(producer.send(new ProducerRecord<>(topic, event.toString())));
+ }
+
+ // Wait for all futures to complete
+ for (Future future : futures) {
+ try {
+ future.get(); // Only blocks if a future is not yet complete
+ } catch (Exception e) {
+ _log.error("Error while sending message to Kafka: {}", e.getMessage());
+ throw new RuntimeException("Error sending messages to Kafka", e);
+ }
+ }
+ _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis());
+ }
+
+
+ /**
+ * Closes the producer to release resources.
+ */
+ public void close() {
+ _log.info("Closing Kafka producer.");
+ try {
+ producer.flush();
+ producer.close();
+ } catch (Exception e) {
+ _log.error("Error while closing Kafka producer: ", e);
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java
new file mode 100644
index 0000000000..693901b264
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java
@@ -0,0 +1,120 @@
+package com.bazaarvoice.emodb.sor.ssm;
+
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for interacting with AWS Parameter Store using AWS SDK v1.
+ */
+public class ParameterStoreUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class);
+ private final AWSSimpleSystemsManagement ssmClient;
+
+ /**
+ * Constructor to initialize the SSM client
+ */
+ public ParameterStoreUtil() {
+ // Create SSM client with default credentials and region
+ ssmClient = AWSSimpleSystemsManagementClientBuilder.standard()
+ .build();
+ }
+
+ /**
+ * Fetches a parameter from AWS Parameter Store.
+ *
+ * @param parameterName The name of the parameter to fetch
+ * @return The value of the parameter
+ * @throws IllegalArgumentException If the parameterName is null or empty
+ */
+ public String getParameter(String parameterName) {
+ if (parameterName == null || parameterName.isEmpty()) {
+ logger.error("Parameter name cannot be null or empty");
+ throw new IllegalArgumentException("Parameter name cannot be null or empty");
+ }
+
+ try {
+
+ GetParameterRequest request = new GetParameterRequest().withName(parameterName);
+ GetParameterResult result = ssmClient.getParameter(request);
+ return result.getParameter().getValue();
+
+ } catch (ParameterNotFoundException e) {
+ logger.error("Parameter not found: {}", parameterName, e);
+ throw new RuntimeException("Parameter not found: " + parameterName, e);
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameter: {}", parameterName, e);
+ throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e);
+ }
+ }
+
+ /**
+ * Fetches multiple parameters from AWS Parameter Store in a batch.
+ *
+ * @param parameterNames The list of parameter names to fetch
+ * @return A map of parameter names to their values
+ * @throws IllegalArgumentException If the parameterNames list is null or empty
+ */
+ public Map getParameters(List parameterNames) {
+ if (parameterNames == null || parameterNames.isEmpty()) {
+ logger.error("Parameter names list cannot be null or empty");
+ throw new IllegalArgumentException("Parameter names list cannot be null or empty");
+ }
+
+ try {
+
+ GetParametersRequest request = new GetParametersRequest().withNames(parameterNames);
+ GetParametersResult result = ssmClient.getParameters(request);
+
+ // Map the result to a Map of parameter names and values
+ Map parameters = new HashMap<>();
+ result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue()));
+
+ // Log any parameters that were not found
+ if (!result.getInvalidParameters().isEmpty()) {
+ logger.warn("The following parameters were not found: {}", result.getInvalidParameters());
+ }
+
+ return parameters;
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameters: {}", parameterNames, e);
+ throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e);
+ }
+ }
+
+ public Long updateParameter(String key, String value) {
+ try {
+ if (key == null || key.trim().isEmpty()) {
+ logger.error("parameter name cannot be null or blank");
+ throw new IllegalArgumentException("parameter name cannot be null or blank");
+ }
+
+ PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true);
+
+ PutParameterResult response = ssmClient.putParameter(request);
+ logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion());
+ return response.getVersion();
+ } catch (Exception e) {
+ logger.error("Failed to update parameter: " + key + " with value: " + value, e);
+ throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e);
+ }
+ }
+
+}
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
index 48da779c69..442de20665 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
@@ -16,6 +16,7 @@
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.table.db.Table;
@@ -485,7 +486,7 @@ public void compact(Table table, String key, UUID compactionKey, Compaction comp
}
};
- final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry());
+ final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
dataStore.createTable(tableName,
@@ -571,7 +572,7 @@ public Record read(Key key, ReadConsistency ignored) {
// Configure the data DAO to read 10 columns initially, causing other column reads to be read lazily
dataDAO.setColumnBatchSize(10);
- final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry());
+ final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
dataStore.createTable(tableName,
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
index 3cf9b7b50f..bb66c772fd 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
@@ -14,6 +14,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.codahale.metrics.MetricRegistry;
@@ -33,6 +34,7 @@
import java.util.Set;
import java.util.UUID;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -47,7 +49,7 @@ public class DataStoreTest {
@Test
public void testDeltas() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
@@ -167,7 +169,7 @@ public void testDeltas() throws Exception {
@Test
public void testRecordTimestamps() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
@@ -262,7 +264,7 @@ record = store.get(TABLE, KEY1);
@Test
public void testRecordTimestampsWithEventTags() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
index ac585fa220..6030144919 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
@@ -10,6 +10,7 @@
import com.bazaarvoice.emodb.sor.db.astyanax.ChangeEncoder;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.PlacementCache;
@@ -43,7 +44,7 @@ public List getSplits(Table table, int recordsPerSplit, int localResplit
}
};
- DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry());
+ DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry(), mock(KafkaProducerService.class));
dataStore.createTable("table", new TableOptionsBuilder().setPlacement("default").build(),
Collections.emptyMap(), new AuditBuilder().build());
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
index 7377838dc5..36bdcdf8e4 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
@@ -24,6 +24,7 @@
import com.bazaarvoice.emodb.sor.db.test.DeltaClusteringKey;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
@@ -65,7 +66,7 @@ public void testRedundantDeltas() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -122,7 +123,7 @@ public void testMinUUIDDelta() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -159,7 +160,7 @@ public void testRedundancyWithTags() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -240,7 +241,7 @@ public void testTagsForNestedMapDeltas() {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -260,7 +261,7 @@ public void testRedundancyWithCompactionAndUnchangedTag() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -337,7 +338,7 @@ public void testPartialCompactionWithNoRedundancy() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -409,7 +410,7 @@ public void testPartialCompactionWithRedundancy() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
index 60574f680c..047c373f72 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
@@ -8,6 +8,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
@@ -19,6 +20,7 @@
import java.util.UUID;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -34,7 +36,7 @@ public class SorUpdateTest {
public void SetupTest() {
final InMemoryDataReaderDAO dataDAO = new InMemoryDataReaderDAO();
_eventWriterRegistry = new DatabusEventWriterRegistry();
- _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry());
+ _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
index c67f985342..66e4d5fe2b 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
@@ -11,6 +11,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryHistoryStore;
import com.bazaarvoice.emodb.sor.core.test.InMemoryMapStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.TableDAO;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
@@ -20,6 +21,8 @@
import java.time.Clock;
import java.util.Optional;
+import static org.mockito.Mockito.mock;
+
/**
* Wrapper around a set of {@link DataStore} instances that replicate to each other,
* simulating a set of eventually consistent data centers.
@@ -63,12 +66,12 @@ public MultiDCDataStores(int numDCs, boolean asyncCompacter, MetricRegistry metr
if (asyncCompacter) {
_stores[i] = new DefaultDataStore(new SimpleLifeCycleRegistry(), metricRegistry, new DatabusEventWriterRegistry(), _tableDao,
_inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), _historyStores[i],
- Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC());
+ Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC(), mock(KafkaProducerService.class));
} else {
_stores[i] = new DefaultDataStore(new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]),
_replDaos[i], new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), _historyStores[i],
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), mock(KafkaProducerService.class));
}
}
}
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
index 5ad5ff357f..f86ba00818 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
@@ -27,6 +27,7 @@
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.delta.MapDeltaBuilder;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.table.db.MoveType;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.TableBackingStore;
@@ -1981,7 +1982,7 @@ dataCenter, mock(RateLimiterCache.class), dataCopyDAO, dataPurgeDAO,
}
private InMemoryDataStore newBackingStore(MetricRegistry metricRegistry) {
- InMemoryDataStore store = new InMemoryDataStore(metricRegistry);
+ InMemoryDataStore store = new InMemoryDataStore(metricRegistry, mock(KafkaProducerService.class));
store.createTable("__system:table", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
store.createTable("__system:table_uuid", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
store.createTable("__system:table_unpublished_databus_events", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
diff --git a/table/pom.xml b/table/pom.xml
index c0c8d86ce2..241962d945 100644
--- a/table/pom.xml
+++ b/table/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/uac-api/pom.xml b/uac-api/pom.xml
index 1298413f2b..399e45aef5 100644
--- a/uac-api/pom.xml
+++ b/uac-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml
index 392bbb0d28..67fd9c73b2 100644
--- a/uac-client-jersey2/pom.xml
+++ b/uac-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client/pom.xml b/uac-client/pom.xml
index e583ba615c..45cbb28f45 100644
--- a/uac-client/pom.xml
+++ b/uac-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/web-local/pom.xml b/web-local/pom.xml
index 33013ccfd8..f2e21d8d68 100644
--- a/web-local/pom.xml
+++ b/web-local/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/web/pom.xml b/web/pom.xml
index 8bc7cd02ef..a3e65691c2 100644
--- a/web/pom.xml
+++ b/web/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
index 8e9f9b2e21..7c73fd693d 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
@@ -61,6 +61,11 @@ public long getEventCountUpTo(Subject subject, @PartitionKey String subscription
return databus(subject).getEventCountUpTo(subscription, limit);
}
+ @Override
+ public long getMasterCount(Subject subject){
+ return databus(subject).getMasterCount();
+ }
+
@Override
public long getClaimCount(Subject subject, @PartitionKey String subscription) {
return databus(subject).getClaimCount(subscription);
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
index ca356d0ac9..91e23b90e9 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
@@ -173,6 +173,18 @@ public long getEventCount(@QueryParam ("partitioned") BooleanParam partitioned,
}
}
+ @GET
+ @Path ("{channel}/uncached_size")
+ @Timed (name = "bv.emodb.databus.DatabusResource1.getMasterEventCount", absolute = true)
+ @ApiOperation (value = "Gets the master event count.",
+ notes = "Returns a long.",
+ response = long.class
+ )
+ public long getEventCountInMaster(@QueryParam ("partitioned") BooleanParam partitioned,
+ @Authenticated Subject subject) {
+ return getClient(partitioned).getMasterCount(subject);
+ }
+
@GET
@Path ("{subscription}/claimcount")
@RequiresPermissions ("databus|get_status|{subscription}")
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
index 44493228e9..bcac1c3e8b 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
@@ -52,6 +52,8 @@ Subscription getSubscription(Subject subject, String subscription)
long getEventCountUpTo(Subject subject, String subscription, long limit);
+ long getMasterCount(Subject subject);
+
long getClaimCount(Subject subject, String subscription);
Iterator peek(Subject subject, String subscription, int limit);
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
index e0a312c623..0e94b1100f 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
@@ -6,6 +6,9 @@
import com.bazaarvoice.emodb.queue.api.MoveQueueStatus;
import com.bazaarvoice.emodb.queue.api.QueueService;
import com.bazaarvoice.emodb.queue.client.QueueServiceAuthenticator;
+import com.bazaarvoice.emodb.queue.core.Entities.QueueExecutionAttributes;
+import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.bazaarvoice.emodb.web.auth.Permissions;
import com.bazaarvoice.emodb.web.auth.resource.NamedResource;
import com.bazaarvoice.emodb.web.jersey.params.SecondsParam;
@@ -28,6 +31,7 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -63,11 +67,17 @@ public class QueueResource1 {
private final Meter _sendBatchNull_qr1;
+ private final ParameterStoreUtil _parameterStoreUtil;
+
+ private final StepFunctionService _stepFunctionService;
+
public QueueResource1(QueueService queueService, QueueServiceAuthenticator queueClient, MetricRegistry metricRegistry) {
//this._metricRegistry = metricRegistry;
_queueService = requireNonNull(queueService, "queueService");
_queueClient = requireNonNull(queueClient, "queueClient");
+ _parameterStoreUtil = new ParameterStoreUtil();
+ _stepFunctionService = new StepFunctionService();
_messageCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "polledMessageCount_qr1"));
_nullPollsCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "nullPollsCount_qr1"));
_sendCount_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendCount_qr1"));
@@ -341,6 +351,36 @@ public SuccessResponse purge(@QueryParam("partitioned") BooleanParam partitioned
return SuccessResponse.instance();
}
+ @PUT
+ @Path("/UpdateParameterStore")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ApiOperation (value = "update param operation at aws parameter store .",
+ notes = "Returns a SuccessResponse.", response = SuccessResponse.class)
+ public SuccessResponse updateParam(Map keyValuePair) {
+ String key = keyValuePair.keySet().iterator().next();
+ String value = keyValuePair.get(key);
+
+ Long update_version = _parameterStoreUtil.updateParameter(key, value);
+ return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | ssm-parameter updated successfully, update_version: " + update_version));
+ }
+
+ @PUT
+ @Path("/{queue_type}/{queue_name}/QueueExecutionAttributes")
+ @RequiresPermissions("queue|poll|{queue_name}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ApiOperation (value = "update queue execution attributes .", notes = "Returns a SuccessResponse.", response = SuccessResponse.class)
+ public SuccessResponse updateQueueExecutionAttributes(@PathParam("queue_type") String queueType, @PathParam("queue_name") String queueName, QueueExecutionAttributes newExecAttributes) {
+ newExecAttributes.setQueueName(queueName);
+ newExecAttributes.setQueueType(queueType);
+ _stepFunctionService.startSFNWithAttributes(newExecAttributes);
+
+ if("DISABLED".equals(newExecAttributes.getStatus())) {
+ return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | step function successfully stopped(if any execution existed) as status=DISABLED was provided"));
+ } else {
+ return SuccessResponse.instance().with(ImmutableMap.of("status", "200 | step function successfully re-started, or started with updated attributes"));
+ }
+ }
+
private QueueService getService(BooleanParam partitioned, String apiKey) {
return partitioned != null && partitioned.get() ? _queueService : _queueClient.usingCredentials(apiKey);
}
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java
index 27baf89a04..2efd03733e 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/sor/DataStoreResource1.java
@@ -8,19 +8,7 @@
import com.bazaarvoice.emodb.common.json.OrderedJson;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
-import com.bazaarvoice.emodb.sor.api.Audit;
-import com.bazaarvoice.emodb.sor.api.Change;
-import com.bazaarvoice.emodb.sor.api.CompactionControlSource;
-import com.bazaarvoice.emodb.sor.api.Coordinate;
-import com.bazaarvoice.emodb.sor.api.DataStore;
-import com.bazaarvoice.emodb.sor.api.FacadeOptions;
-import com.bazaarvoice.emodb.sor.api.Intrinsic;
-import com.bazaarvoice.emodb.sor.api.PurgeStatus;
-import com.bazaarvoice.emodb.sor.api.Table;
-import com.bazaarvoice.emodb.sor.api.TableOptions;
-import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEvent;
-import com.bazaarvoice.emodb.sor.api.Update;
-import com.bazaarvoice.emodb.sor.api.WriteConsistency;
+import com.bazaarvoice.emodb.sor.api.*;
import com.bazaarvoice.emodb.sor.core.DataStoreAsync;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
@@ -947,6 +935,59 @@ private Response redirectTo(DataCenter dataCenter, URI requestUri) {
build();
}
+ @POST
+ @Path("{channel}/sendbatch1")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Timed(name = "bv.emodb.sor.DataStoreResource1.updateRef", absolute = true)
+ @ApiOperation(value = "Updates a reference",
+ notes = "Updates a reference",
+ response = SuccessResponse.class
+ )
+ public SuccessResponse updateRefToDatabus(InputStream in,
+ @QueryParam("consistency") @DefaultValue("STRONG") WriteConsistencyParam consistency,
+ @QueryParam("tag") List tags,
+ @Authenticated Subject subject) {
+ Set tagsSet = (tags == null) ? ImmutableSet.of() : Sets.newHashSet(tags);
+ Iterable updateRefs = asSubjectSafeUpdateRefModelIterable(new JsonStreamingArrayParser<>(in, UpdateRefModel.class), subject, true);
+ // Perform the update by writing to Databus
+ _dataStore.updateRefInDatabus(updateRefs, tagsSet, false);
+ return SuccessResponse.instance();
+ }
+
+ /**
+ * Takes an update ref stream from a subject and performs the following actions on it:
+ * 1. Checks that the subject has permission to update the record being updated
+ * 2. Applies any active rate limiting for updates by the subject
+ */
+ private Iterable asSubjectSafeUpdateRefModelIterable(Iterator updateRefs, final Subject subject, final boolean isFacade) {
+ return Iterables.filter(
+ OneTimeIterable.wrap(updateRefs),
+ new Predicate() {
+ @Override
+ public boolean apply(UpdateRefModel updateRefModel) {
+ NamedResource resource = new NamedResource(updateRefModel.getTable());
+ boolean hasPermission;
+ if (isFacade) {
+ hasPermission = subject.hasPermission(Permissions.updateFacade(resource));
+ } else {
+ hasPermission = subject.hasPermission(Permissions.updateSorTable(resource));
+ }
+
+ if (!hasPermission) {
+ throw new UnauthorizedException("not authorized to update table " + updateRefModel.getTable());
+ }
+
+ // Facades are a unique case used internally for shoveling data across data centers, so don't rate
+ // limit facade updates.
+ if (!isFacade) {
+ _updateThrottle.beforeUpdate(subject.getId());
+ }
+
+ return true;
+ }
+ });
+ }
+
private UUID parseUuidOrTimestamp(@Nullable String string, boolean rangeUpperEnd) {
if (string == null) {
return null;
diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java
index fd11acab47..d0d45f8435 100644
--- a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java
+++ b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java
@@ -24,6 +24,7 @@
import com.bazaarvoice.emodb.sor.core.PurgeResult;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.web.resources.sor.AuditParam;
import com.bazaarvoice.emodb.web.resources.sor.DataStoreResource1;
import com.bazaarvoice.emodb.web.throttling.UnlimitedDataStoreUpdateThrottler;
@@ -84,7 +85,7 @@ public void setUp() throws Exception {
lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator,
1, Duration.ZERO, 100, Duration.ofHours(1));
- _store = new InMemoryDataStore(new MetricRegistry());
+ _store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
_dataStoreResource = new DataStoreResource1(_store, new DefaultDataStoreAsync(_store, _service, _jobHandlerRegistry),
mock(CompactionControlSource.class), new UnlimitedDataStoreUpdateThrottler());
diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java
index a96563df8d..03c4b58787 100644
--- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java
+++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java
@@ -27,6 +27,7 @@
import com.bazaarvoice.emodb.sor.db.Record;
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.sor.db.ScanRangeSplits;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxStorage;
import com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor;
@@ -420,8 +421,9 @@ dataTools, scanWriterGenerator, compactionControlSource, mock(LifeCycleRegistry.
@Test
public void testScanUploadFromExistingScan() throws Exception {
MetricRegistry metricRegistry = new MetricRegistry();
+ KafkaProducerService kafkaProducerService = mock(KafkaProducerService.class);
// Use an in-memory data store but override the default splits operation to return 4 splits for the test placement
- InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry));
+ InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry, kafkaProducerService));
when(dataStore.getScanRangeSplits("app_global:default", 1000000, Optional.empty()))
.thenReturn(new ScanRangeSplits(ImmutableList.of(
createSimpleSplitGroup("00", "40"),
@@ -621,7 +623,7 @@ public void testScanFailureRecovery()
Lists.newArrayList(), Lists.newArrayList());
InMemoryScanWorkflow scanWorkflow = new InMemoryScanWorkflow();
- ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry()), "scan_table", "app_global:sys");
+ ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class)), "scan_table", "app_global:sys");
LocalScanUploadMonitor monitor = new LocalScanUploadMonitor(scanWorkflow, scanStatusDAO,
mock(ScanWriterGenerator.class), mock(StashStateListener.class), mock(ScanCountListener.class),
mock(DataTools.class), new InMemoryCompactionControlSource(), mock(DataCenters.class));
diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java
index 0c19fa2c14..0fedee1486 100644
--- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java
+++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java
@@ -6,6 +6,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.web.scanner.ScanDestination;
import com.bazaarvoice.emodb.web.scanner.ScanOptions;
import com.codahale.metrics.MetricRegistry;
@@ -22,6 +23,7 @@
import java.util.List;
import java.util.Optional;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -33,7 +35,7 @@ public class DataStoreScanStatusDAOTest {
@BeforeMethod
public void setUp() {
- _dataStore = new InMemoryDataStore(new MetricRegistry());
+ _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
_dao = new DataStoreScanStatusDAO(_dataStore, "scan_table", "app_global:sys");
}
diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java
index 4e14f50fd1..aab4241cae 100644
--- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java
+++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java
@@ -2,6 +2,7 @@
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.BeforeMethod;
@@ -9,6 +10,7 @@
import java.util.Date;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -19,7 +21,7 @@ public class DataStoreStashRequestDAOTest {
@BeforeMethod
public void setUp() {
- _dataStore = new InMemoryDataStore(new MetricRegistry());
+ _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
_dao = new DataStoreStashRequestDAO(_dataStore, "request_table", "app_global:sys");
}
diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java
index f8c5758a07..fd7249564f 100644
--- a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java
+++ b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java
@@ -6,6 +6,7 @@
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.inject.util.Providers;
@@ -32,7 +33,7 @@ public class SettingsManagerTest {
@BeforeMethod
public void setUp() {
- _dataStore = new InMemoryDataStore(new MetricRegistry());
+ _dataStore = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
_cacheRegistry = mock(CacheRegistry.class);
_cacheHandle = mock(CacheHandle.class);
when(_cacheRegistry.register(eq("settings"), any(Cache.class), eq(true))).thenReturn(_cacheHandle);
diff --git a/yum/pom.xml b/yum/pom.xml
index b4c6f8e3a5..d82d25e11b 100644
--- a/yum/pom.xml
+++ b/yum/pom.xml
@@ -4,7 +4,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.205-SNAPSHOT
+ 6.5.209-SNAPSHOT
../parent/pom.xml