diff --git a/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java b/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java index 67c3849a7..e6da43c02 100644 --- a/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java +++ b/quality/integration/src/test/java/test/integration/auth/TableAuthIdentityManagerDAOTest.java @@ -3,13 +3,12 @@ import com.bazaarvoice.emodb.auth.apikey.ApiKey; import com.bazaarvoice.emodb.auth.apikey.ApiKeyModification; import com.bazaarvoice.emodb.auth.identity.TableAuthIdentityManagerDAO; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.Intrinsic; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; @@ -41,7 +40,7 @@ public class TableAuthIdentityManagerDAOTest { */ @Test public void testRebuildIdIndex() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", @@ -79,7 +78,7 @@ public void testRebuildIdIndex() { @Test public void testGrandfatheredInId() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", @@ -131,7 +130,7 @@ public void testGrandfatheredInId() { @Test public void testIdAttributeCompatibility() { - DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); Supplier idSupplier = () -> "id0"; TableAuthIdentityManagerDAO tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>( ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys", diff --git a/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java b/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java index 0c54beb6b..a3db269d9 100644 --- a/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java +++ b/quality/integration/src/test/java/test/integration/auth/TableRoleManagerDAOTest.java @@ -9,8 +9,6 @@ import com.bazaarvoice.emodb.auth.role.RoleModification; import com.bazaarvoice.emodb.auth.role.RoleNotFoundException; import com.bazaarvoice.emodb.auth.role.TableRoleManagerDAO; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.Intrinsic; @@ -20,6 +18,7 @@ import com.bazaarvoice.emodb.sor.api.WriteConsistency; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.auth.EmoPermissionResolver; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; @@ -62,7 +61,7 @@ public class TableRoleManagerDAOTest { @BeforeMethod public void setUp() { // DataStore and PermissionManager are fairly heavy to fully mock. Use spies on in-memory implementations instead - _backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _dataStore = spy(_backendDataStore); _permissionResolver = new EmoPermissionResolver(null, null); _backendPermissionManager = new InMemoryPermissionManager(_permissionResolver); diff --git a/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java b/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java index 702cf97d4..79ce74873 100644 --- a/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java +++ b/quality/integration/src/test/java/test/integration/sor/CasStashTableTest.java @@ -10,8 +10,6 @@ import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore; import com.bazaarvoice.emodb.datacenter.api.DataCenter; import com.bazaarvoice.emodb.datacenter.api.DataCenters; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.condition.Conditions; @@ -19,6 +17,7 @@ import com.bazaarvoice.emodb.sor.db.astyanax.DeltaPlacementFactory; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxTableDAO; import com.bazaarvoice.emodb.table.db.astyanax.CQLStashTableDAO; @@ -99,7 +98,7 @@ public void setup() throws Exception { _astyanaxTableDAO.setCQLStashTableDAO(cqlStashTableDAO); // Don't store table definitions in the actual backing store so as not to interrupt other tests. Use a // private in-memory implementation. - _tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _astyanaxTableDAO.setBackingStore(_tableBackingStore); _lifeCycleRegistry.start(); diff --git a/sor/pom.xml b/sor/pom.xml index e678cd1c6..a93fb4b0b 100644 --- a/sor/pom.xml +++ b/sor/pom.xml @@ -349,10 +349,12 @@ test - com.bazaarvoice.emodb - emodb-queue - 6.5.205-SNAPSHOT - compile + com.amazonaws + aws-java-sdk-ssm + + + org.apache.kafka + kafka-clients diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java index e52f2bc03..4b6475b59 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java @@ -5,10 +5,6 @@ import com.bazaarvoice.emodb.common.json.deferred.LazyJsonMap; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; import com.bazaarvoice.emodb.common.zookeeper.store.MapStore; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; -import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil; import com.bazaarvoice.emodb.sor.api.*; import com.bazaarvoice.emodb.sor.audit.AuditWriter; import com.bazaarvoice.emodb.sor.compactioncontrol.LocalCompactionControl; @@ -24,7 +20,10 @@ import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.db.ScanRangeSplits; import com.bazaarvoice.emodb.sor.delta.Delta; +import com.bazaarvoice.emodb.sor.kafka.KafkaConfig; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.SlowQueryLog; +import com.bazaarvoice.emodb.sor.ssm.ParameterStoreUtil; import com.bazaarvoice.emodb.table.db.DroppedTableException; import com.bazaarvoice.emodb.table.db.StashBlackListTableCondition; import com.bazaarvoice.emodb.table.db.StashTableDAO; @@ -108,7 +107,6 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab private final Clock _clock; private final KafkaProducerService _kafkaProducerService; private ParameterStoreUtil parameterStoreUtil; - private final BaseEventStore _eventStore; private final Cache dataThrottlerCache = CacheBuilder.newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) .build(); @@ -120,10 +118,10 @@ public DefaultDataStore(LifeCycleRegistry lifeCycle, MetricRegistry metricRegist DataReaderDAO dataReaderDao, DataWriterDAO dataWriterDao, SlowQueryLog slowQueryLog, HistoryStore historyStore, @StashRoot Optional stashRootDirectory, @LocalCompactionControl CompactionControlSource compactionControlSource, @StashBlackListTableCondition Condition stashBlackListTableCondition, AuditWriter auditWriter, - @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) { + @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService) { this(eventWriterRegistry, tableDao, dataReaderDao, dataWriterDao, slowQueryLog, defaultCompactionExecutor(lifeCycle), historyStore, stashRootDirectory, compactionControlSource, stashBlackListTableCondition, auditWriter, - minSplitSizeMap, metricRegistry, clock, kafkaProducerService, eventStore); + minSplitSizeMap, metricRegistry, clock, kafkaProducerService); } @VisibleForTesting @@ -132,7 +130,7 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO SlowQueryLog slowQueryLog, ExecutorService compactionExecutor, HistoryStore historyStore, Optional stashRootDirectory, CompactionControlSource compactionControlSource, Condition stashBlackListTableCondition, AuditWriter auditWriter, - MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) { + MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService) { _eventWriterRegistry = requireNonNull(eventWriterRegistry, "eventWriterRegistry"); _tableDao = requireNonNull(tableDao, "tableDao"); _dataReaderDao = requireNonNull(dataReaderDao, "dataReaderDao"); @@ -154,7 +152,6 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO _clock = requireNonNull(clock, "clock"); _kafkaProducerService = requireNonNull(kafkaProducerService, "kafkaProducerService"); this.parameterStoreUtil = new ParameterStoreUtil(); - _eventStore = eventStore; } /** diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java index ec6cd6033..8f6d9da16 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java @@ -1,13 +1,12 @@ package com.bazaarvoice.emodb.sor.core.test; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.audit.DiscardingAuditWriter; import com.bazaarvoice.emodb.sor.compactioncontrol.InMemoryCompactionControlSource; import com.bazaarvoice.emodb.sor.condition.Conditions; import com.bazaarvoice.emodb.sor.core.DatabusEventWriterRegistry; import com.bazaarvoice.emodb.sor.core.DefaultDataStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; import com.codahale.metrics.MetricRegistry; @@ -21,19 +20,19 @@ */ public class InMemoryDataStore extends DefaultDataStore { - public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) { - this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService, eventStore); + public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { + this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService); } - public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) { - this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService, eventStore); + public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { + this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService); } - public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) { + public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) { super(eventWriterRegistry, new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService, eventStore); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService); } } diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java new file mode 100644 index 000000000..8bdccc291 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java @@ -0,0 +1,118 @@ +package com.bazaarvoice.emodb.sor.kafka; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class KafkaAdminService { + private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class); + private final AdminClient adminClient; + // Cache for the list of all topics with a TTL of 10 minutes + private final Cache> topicListCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + + private static final String TOPIC_LIST_KEY = "allTopics"; + + + public KafkaAdminService() { + this.adminClient = AdminClient.create(KafkaConfig.getAdminProps()); + } + + /** + * Creates a new Kafka topic with the specified configurations. + * + * @param topic The name of the topic. + * @param numPartitions Number of partitions. + * @param replicationFactor Replication factor. + */ + public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) { + Boolean isExisting =isTopicExists(topic); + if (! isExisting) { + //create the topic now + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); + try { + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + addToCache(topic); + _log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor); + } catch (Exception e) { + _log.error("Error creating topic {}: ", topic, e); + throw new RuntimeException(e); + } + } + return isExisting; + } + public void addToCache(String topic){ + Set topics = topicListCache.getIfPresent(TOPIC_LIST_KEY); + if (topics == null) { + topics = new HashSet<>(); + } else { + // Create a new mutable Set if the existing one is unmodifiable + topics = new HashSet<>(topics); + } + topics.add(topic); + topicListCache.put(TOPIC_LIST_KEY, topics); + _log.info("Added newly created topic to cache: {}", topic); + } + + + /** + * Checks if a Kafka topic exists by using a cache to store the list of all topics. + * If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list. + *

+ * The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed + * from Kafka on the next access. + *

+ * + * @param topic the name of the Kafka topic to check + * @return {@code true} if the topic exists, otherwise {@code false}. + * @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists. + */ + public boolean isTopicExists(String topic) { + try { + // Retrieve the list of topics from the cache + Set topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka); + + // Check if the given topic is in the cached list + return topics.contains(topic); + } catch (ExecutionException e) { + _log.error("Failed to check if topic exists: {}", topic, e); + throw new RuntimeException("Error checking if topic exists", e); + } + } + + /** + * Fetches the list of all topic names from Kafka AdminClient. + * This method is called only when the cache is expired or empty. + * + * @return a Set containing all topic names. + * @throws ExecutionException if there is an error fetching the topic list from Kafka. + */ + private Set fetchTopicListFromKafka() throws ExecutionException { + try { + _log.info("Fetching topic list from Kafka"); + return adminClient.listTopics().names().get(); + } catch (Exception e) { + _log.error("Error fetching topic list from Kafka", e); + throw new ExecutionException(e); + } + } + + /** + * Closes the AdminClient to release resources. + */ + public void close() { + if (adminClient != null) { + adminClient.close(); + } + } +} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java new file mode 100644 index 000000000..255bc53fb --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java @@ -0,0 +1,157 @@ +package com.bazaarvoice.emodb.sor.kafka; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.Parameter; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +public class KafkaConfig { + private static String bootstrapServersConfig; + private static String batchSizeConfig; + private static String retriesConfig; + private static String lingerMsConfig; + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + // Static SSM Client and configuration using AWS SDK v1 + private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder + .standard() + .build(); + + + static { + try { + final String UNIVERSE = getUniverseFromEnv(); + // Load configurations from SSM during static initialization + Map parameterValues = getParameterValues( + Arrays.asList( + "/" + UNIVERSE + "/emodb/kafka/batchSize", + "/" + UNIVERSE + "/emodb/kafka/retries", + "/" + UNIVERSE + "/emodb/kafka/lingerMs", + "/" + UNIVERSE + "/emodb/kafka/bootstrapServers" + ) + ); + + // Set configurations with fallback to defaults if not present + // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending. + batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384"); + + // Sets the number of retry attempts for failed Kafka message sends. + retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3"); + + // Sets the number of milliseconds a producer is willing to wait before sending a batch out + lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1"); + + // Configures the Kafka broker addresses for producer connections. + bootstrapServersConfig = parameterValues.get("/" + UNIVERSE + "/emodb/kafka/bootstrapServers"); + + logger.info("Kafka configurations loaded successfully from SSM."); + } catch (AmazonServiceException e) { + logger.error("Failed to load configurations from SSM. Using default values.", e); + throw e; + } + catch (Exception e) { + logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e); + throw e; + } + } + + public static String getUniverseFromEnv() { + String filePath = "/etc/environment"; + logger.info("Reading environment file: " + filePath); + Properties environmentProps = new Properties(); + + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = reader.readLine()) != null) { + // Skip empty lines or comments + if (line.trim().isEmpty() || line.trim().startsWith("#")) { + continue; + } + // Split the line into key-value pair + String[] parts = line.split("=", 2); + logger.info("parts: " + Arrays.toString(parts)); + if (parts.length == 2) { + String key = parts[0].trim(); + String value = parts[1].trim(); + // Remove any surrounding quotes from value + value = value.replace("\"", ""); + environmentProps.put(key, value); + } + } + // Access the environment variables + return environmentProps.getProperty("UNIVERSE"); + } catch (IOException e) { + logger.error("Error reading environment file: " + e.getMessage()); + throw new RuntimeException("Error reading environment file: " + e.getMessage()); + } + } + // Fetch parameters from AWS SSM using AWS SDK v1 + private static Map getParameterValues(List parameterNames) { + try { + GetParametersRequest request = new GetParametersRequest() + .withNames(parameterNames) + .withWithDecryption(true); + + GetParametersResult response = ssmClient.getParameters(request); + + return response.getParameters().stream() + .collect(Collectors.toMap(Parameter::getName, Parameter::getValue)); + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from SSM.", e); + throw e; // Rethrow or handle the exception if necessary + } + } + + // Kafka Producer properties + public static Properties getProducerProps() { + Properties producerProps = new Properties(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig)); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig)); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig)); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting + logger.info("Kafka Producer properties initialized."); + return producerProps; + } + + // Kafka Admin properties + public static Properties getAdminProps() { + Properties adminProps = new Properties(); + + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + logger.info("Kafka Admin properties initialized."); + return adminProps; + } + + // Ensure the SSM client is closed when the application shuts down + public static void shutdown() { + if (ssmClient != null) { + try { + ssmClient.shutdown(); + logger.info("SSM client closed successfully."); + } catch (Exception e) { + logger.error("Error while closing SSM client.", e); + } + } + } +} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java new file mode 100644 index 000000000..4cb587cf2 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java @@ -0,0 +1,66 @@ +package com.bazaarvoice.emodb.sor.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Future; + +public class KafkaProducerService { + private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class); + private final KafkaProducer producer; // Changed to String + + public KafkaProducerService() { + this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps()); + _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps()); + } + + /** + * Sends each message from the collection to the specified Kafka topic separately. + * + * @param topic The Kafka topic. + * @param events The collection of messages to be sent. + */ + public void sendMessages(String topic, Collection events, String queueType) { + LocalDateTime startTime = LocalDateTime.now(); + _log.info("Sending {} messages to topic '{}'", events.size(), topic); + List> futures = new ArrayList<>(); + // Use async sendMessage and collect futures + for (T event : events) { + futures.add(producer.send(new ProducerRecord<>(topic, event.toString()))); + } + + // Wait for all futures to complete + for (Future future : futures) { + try { + future.get(); // Only blocks if a future is not yet complete + } catch (Exception e) { + _log.error("Error while sending message to Kafka: {}", e.getMessage()); + throw new RuntimeException("Error sending messages to Kafka", e); + } + } + _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis()); + } + + + /** + * Closes the producer to release resources. + */ + public void close() { + _log.info("Closing Kafka producer."); + try { + producer.flush(); + producer.close(); + } catch (Exception e) { + _log.error("Error while closing Kafka producer: ", e); + throw e; + } + } +} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java new file mode 100644 index 000000000..693901b26 --- /dev/null +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java @@ -0,0 +1,120 @@ +package com.bazaarvoice.emodb.sor.ssm; + +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class for interacting with AWS Parameter Store using AWS SDK v1. + */ +public class ParameterStoreUtil { + + private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class); + private final AWSSimpleSystemsManagement ssmClient; + + /** + * Constructor to initialize the SSM client + */ + public ParameterStoreUtil() { + // Create SSM client with default credentials and region + ssmClient = AWSSimpleSystemsManagementClientBuilder.standard() + .build(); + } + + /** + * Fetches a parameter from AWS Parameter Store. + * + * @param parameterName The name of the parameter to fetch + * @return The value of the parameter + * @throws IllegalArgumentException If the parameterName is null or empty + */ + public String getParameter(String parameterName) { + if (parameterName == null || parameterName.isEmpty()) { + logger.error("Parameter name cannot be null or empty"); + throw new IllegalArgumentException("Parameter name cannot be null or empty"); + } + + try { + + GetParameterRequest request = new GetParameterRequest().withName(parameterName); + GetParameterResult result = ssmClient.getParameter(request); + return result.getParameter().getValue(); + + } catch (ParameterNotFoundException e) { + logger.error("Parameter not found: {}", parameterName, e); + throw new RuntimeException("Parameter not found: " + parameterName, e); + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameter: {}", parameterName, e); + throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e); + } + } + + /** + * Fetches multiple parameters from AWS Parameter Store in a batch. + * + * @param parameterNames The list of parameter names to fetch + * @return A map of parameter names to their values + * @throws IllegalArgumentException If the parameterNames list is null or empty + */ + public Map getParameters(List parameterNames) { + if (parameterNames == null || parameterNames.isEmpty()) { + logger.error("Parameter names list cannot be null or empty"); + throw new IllegalArgumentException("Parameter names list cannot be null or empty"); + } + + try { + + GetParametersRequest request = new GetParametersRequest().withNames(parameterNames); + GetParametersResult result = ssmClient.getParameters(request); + + // Map the result to a Map of parameter names and values + Map parameters = new HashMap<>(); + result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue())); + + // Log any parameters that were not found + if (!result.getInvalidParameters().isEmpty()) { + logger.warn("The following parameters were not found: {}", result.getInvalidParameters()); + } + + return parameters; + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameters: {}", parameterNames, e); + throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e); + } + } + + public Long updateParameter(String key, String value) { + try { + if (key == null || key.trim().isEmpty()) { + logger.error("parameter name cannot be null or blank"); + throw new IllegalArgumentException("parameter name cannot be null or blank"); + } + + PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true); + + PutParameterResult response = ssmClient.putParameter(request); + logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion()); + return response.getVersion(); + } catch (Exception e) { + logger.error("Failed to update parameter: " + key + " with value: " + value, e); + throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e); + } + } + +} diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java index 0cb2483f2..7ad3205cb 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java @@ -1,7 +1,5 @@ package com.bazaarvoice.emodb.sor.core; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.Change; import com.bazaarvoice.emodb.sor.api.ChangeBuilder; @@ -18,6 +16,7 @@ import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.bazaarvoice.emodb.table.db.Table; @@ -487,7 +486,7 @@ public void compact(Table table, String key, UUID compactionKey, Compaction comp } }; - final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), new KafkaProducerService()); // Create a table for our test dataStore.createTable(tableName, @@ -573,7 +572,7 @@ public Record read(Key key, ReadConsistency ignored) { // Configure the data DAO to read 10 columns initially, causing other column reads to be read lazily dataDAO.setColumnBatchSize(10); - final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), new KafkaProducerService()); // Create a table for our test dataStore.createTable(tableName, diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java index 6fabfcf1e..5cff68b69 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java @@ -1,7 +1,5 @@ package com.bazaarvoice.emodb.sor.core; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.Change; @@ -16,6 +14,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.codahale.metrics.MetricRegistry; @@ -50,7 +49,7 @@ public class DataStoreTest { @Test public void testDeltas() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(),mock(BaseEventStore.class)); + DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); @@ -170,7 +169,7 @@ public void testDeltas() throws Exception { @Test public void testRecordTimestamps() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(),mock(BaseEventStore.class)); + DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); @@ -265,7 +264,7 @@ record = store.get(TABLE, KEY1); @Test public void testRecordTimestampsWithEventTags() throws Exception { - DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(),mock(BaseEventStore.class)); + DataStore store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); assertFalse(store.getTableExists(TABLE)); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java index b32e194d9..57cba50ea 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java @@ -1,7 +1,5 @@ package com.bazaarvoice.emodb.sor.core; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.TableOptionsBuilder; @@ -12,6 +10,7 @@ import com.bazaarvoice.emodb.sor.db.astyanax.ChangeEncoder; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.PlacementCache; @@ -45,7 +44,7 @@ public List getSplits(Table table, int recordsPerSplit, int localResplit } }; - DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry(), new KafkaProducerService()); dataStore.createTable("table", new TableOptionsBuilder().setPlacement("default").build(), Collections.emptyMap(), new AuditBuilder().build()); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java index 8c14f5e7e..06e2ce099 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java @@ -2,8 +2,6 @@ import com.bazaarvoice.emodb.common.json.JsonHelper; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.Change; @@ -26,6 +24,7 @@ import com.bazaarvoice.emodb.sor.db.test.DeltaClusteringKey; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; @@ -67,7 +66,7 @@ public void testRedundantDeltas() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -124,7 +123,7 @@ public void testMinUUIDDelta() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -161,7 +160,7 @@ public void testRedundancyWithTags() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -242,7 +241,7 @@ public void testTagsForNestedMapDeltas() { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -262,7 +261,7 @@ public void testRedundancyWithCompactionAndUnchangedTag() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -339,7 +338,7 @@ public void testPartialCompactionWithNoRedundancy() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); @@ -411,7 +410,7 @@ public void testPartialCompactionWithRedundancy() throws Exception { DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao, new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(), Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), new KafkaProducerService()); TableOptions options = new TableOptionsBuilder().setPlacement("default").build(); store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table")); diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java index 765ce00d1..3c1a7ce86 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java @@ -1,8 +1,6 @@ package com.bazaarvoice.emodb.sor.core; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.TableOptionsBuilder; @@ -10,6 +8,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.test.SystemClock; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableMap; @@ -37,7 +36,7 @@ public class SorUpdateTest { public void SetupTest() { final InMemoryDataReaderDAO dataDAO = new InMemoryDataReaderDAO(); _eventWriterRegistry = new DatabusEventWriterRegistry(); - _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry(), new KafkaProducerService()); // Create a table for our test diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java index 35f9d5c86..d747a0279 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java @@ -1,8 +1,6 @@ package com.bazaarvoice.emodb.sor.test; import com.bazaarvoice.emodb.common.dropwizard.lifecycle.SimpleLifeCycleRegistry; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.audit.DiscardingAuditWriter; import com.bazaarvoice.emodb.sor.compactioncontrol.InMemoryCompactionControlSource; @@ -13,6 +11,7 @@ import com.bazaarvoice.emodb.sor.core.test.InMemoryHistoryStore; import com.bazaarvoice.emodb.sor.core.test.InMemoryMapStore; import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog; import com.bazaarvoice.emodb.table.db.TableDAO; import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO; @@ -67,12 +66,12 @@ public MultiDCDataStores(int numDCs, boolean asyncCompacter, MetricRegistry metr if (asyncCompacter) { _stores[i] = new DefaultDataStore(new SimpleLifeCycleRegistry(), metricRegistry, new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), _historyStores[i], - Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC(), new KafkaProducerService()); } else { _stores[i] = new DefaultDataStore(new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), _historyStores[i], Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), - new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), new KafkaProducerService(), mock(BaseEventStore.class)); + new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), new KafkaProducerService()); } } } diff --git a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java index bb6009e0f..2b5c15792 100644 --- a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java +++ b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java @@ -11,8 +11,6 @@ import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore; import com.bazaarvoice.emodb.datacenter.api.DataCenter; import com.bazaarvoice.emodb.datacenter.core.DefaultDataCenter; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.FacadeExistsException; @@ -29,6 +27,7 @@ import com.bazaarvoice.emodb.sor.delta.Delta; import com.bazaarvoice.emodb.sor.delta.Deltas; import com.bazaarvoice.emodb.sor.delta.MapDeltaBuilder; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.MoveType; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.TableBackingStore; @@ -1983,7 +1982,7 @@ dataCenter, mock(RateLimiterCache.class), dataCopyDAO, dataPurgeDAO, } private InMemoryDataStore newBackingStore(MetricRegistry metricRegistry) { - InMemoryDataStore store = new InMemoryDataStore(metricRegistry, new KafkaProducerService(), mock(BaseEventStore.class)); + InMemoryDataStore store = new InMemoryDataStore(metricRegistry, new KafkaProducerService()); store.createTable("__system:table", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); store.createTable("__system:table_uuid", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); store.createTable("__system:table_unpublished_databus_events", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit()); diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java index 4bdc43f9d..65f8ad237 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/purge/PurgeTest.java @@ -2,7 +2,6 @@ import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; -import com.bazaarvoice.emodb.event.api.BaseEventStore; import com.bazaarvoice.emodb.job.api.JobIdentifier; import com.bazaarvoice.emodb.job.api.JobRequest; import com.bazaarvoice.emodb.job.api.JobStatus; @@ -12,7 +11,6 @@ import com.bazaarvoice.emodb.job.handler.DefaultJobHandlerRegistry; import com.bazaarvoice.emodb.job.service.DefaultJobService; import com.bazaarvoice.emodb.queue.api.QueueService; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.Audit; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.CompactionControlSource; @@ -26,6 +24,7 @@ import com.bazaarvoice.emodb.sor.core.PurgeResult; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.resources.sor.AuditParam; import com.bazaarvoice.emodb.web.resources.sor.DataStoreResource1; import com.bazaarvoice.emodb.web.throttling.UnlimitedDataStoreUpdateThrottler; @@ -86,7 +85,7 @@ public void setUp() throws Exception { lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator, 1, Duration.ZERO, 100, Duration.ofHours(1)); - _store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _store = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _dataStoreResource = new DataStoreResource1(_store, new DefaultDataStoreAsync(_store, _service, _jobHandlerRegistry), mock(CompactionControlSource.class), new UnlimitedDataStoreUpdateThrottler()); diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java index d5a4eaa12..a0693d3ae 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/ScanUploaderTest.java @@ -12,11 +12,9 @@ import com.bazaarvoice.emodb.common.json.JsonHelper; import com.bazaarvoice.emodb.datacenter.api.DataCenter; import com.bazaarvoice.emodb.datacenter.api.DataCenters; -import com.bazaarvoice.emodb.event.api.BaseEventStore; import com.bazaarvoice.emodb.plugin.stash.StashMetadata; import com.bazaarvoice.emodb.plugin.stash.StashStateListener; import com.bazaarvoice.emodb.queue.core.ByteBufferInputStream; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.CompactionControlSource; import com.bazaarvoice.emodb.sor.api.Intrinsic; import com.bazaarvoice.emodb.sor.api.ReadConsistency; @@ -29,6 +27,7 @@ import com.bazaarvoice.emodb.sor.db.Record; import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.db.ScanRangeSplits; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.table.db.Table; import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxStorage; import com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor; @@ -424,7 +423,7 @@ public void testScanUploadFromExistingScan() throws Exception { MetricRegistry metricRegistry = new MetricRegistry(); KafkaProducerService kafkaProducerService = new KafkaProducerService(); // Use an in-memory data store but override the default splits operation to return 4 splits for the test placement - InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry, kafkaProducerService, mock(BaseEventStore.class))); + InMemoryDataStore dataStore = spy(new InMemoryDataStore(metricRegistry, kafkaProducerService)); when(dataStore.getScanRangeSplits("app_global:default", 1000000, Optional.empty())) .thenReturn(new ScanRangeSplits(ImmutableList.of( createSimpleSplitGroup("00", "40"), @@ -624,7 +623,7 @@ public void testScanFailureRecovery() Lists.newArrayList(), Lists.newArrayList()); InMemoryScanWorkflow scanWorkflow = new InMemoryScanWorkflow(); - ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)), "scan_table", "app_global:sys"); + ScanStatusDAO scanStatusDAO = new DataStoreScanStatusDAO(new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()), "scan_table", "app_global:sys"); LocalScanUploadMonitor monitor = new LocalScanUploadMonitor(scanWorkflow, scanStatusDAO, mock(ScanWriterGenerator.class), mock(StashStateListener.class), mock(ScanCountListener.class), mock(DataTools.class), new InMemoryCompactionControlSource(), mock(DataCenters.class)); diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java index 2581db11f..0ee31fe2e 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreScanStatusDAOTest.java @@ -1,13 +1,12 @@ package com.bazaarvoice.emodb.web.scanner.scanstatus; import com.bazaarvoice.emodb.common.uuid.TimeUUIDs; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.AuditBuilder; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; import com.bazaarvoice.emodb.sor.db.ScanRange; import com.bazaarvoice.emodb.sor.delta.Deltas; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.web.scanner.ScanDestination; import com.bazaarvoice.emodb.web.scanner.ScanOptions; import com.codahale.metrics.MetricRegistry; @@ -36,7 +35,7 @@ public class DataStoreScanStatusDAOTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _dao = new DataStoreScanStatusDAO(_dataStore, "scan_table", "app_global:sys"); } diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java index a5df6effb..04414530a 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/scanner/scanstatus/DataStoreStashRequestDAOTest.java @@ -1,9 +1,8 @@ package com.bazaarvoice.emodb.web.scanner.scanstatus; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableSet; import org.testng.annotations.BeforeMethod; @@ -22,7 +21,7 @@ public class DataStoreStashRequestDAOTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _dao = new DataStoreStashRequestDAO(_dataStore, "request_table", "app_global:sys"); } diff --git a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java index 1ad088d5e..a213da5ae 100644 --- a/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java +++ b/web/src/test/java/com/bazaarvoice/emodb/web/settings/SettingsManagerTest.java @@ -3,11 +3,10 @@ import com.bazaarvoice.emodb.cachemgr.api.CacheHandle; import com.bazaarvoice.emodb.cachemgr.api.CacheRegistry; import com.bazaarvoice.emodb.cachemgr.api.InvalidationScope; -import com.bazaarvoice.emodb.event.api.BaseEventStore; -import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.api.DataStore; import com.bazaarvoice.emodb.sor.api.Intrinsic; import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.inject.util.Providers; @@ -34,7 +33,7 @@ public class SettingsManagerTest { @BeforeMethod public void setUp() { - _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class)); + _dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService()); _cacheRegistry = mock(CacheRegistry.class); _cacheHandle = mock(CacheHandle.class); when(_cacheRegistry.register(eq("settings"), any(Cache.class), eq(true))).thenReturn(_cacheHandle);