diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java index b42d33b02..6cbc958fe 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java @@ -34,6 +34,7 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.TypeLiteral; +import com.timgroup.statsd.StatsDClient; import org.apache.curator.framework.CuratorFramework; import java.time.Clock; @@ -76,6 +77,7 @@ public QueueModule(MetricRegistry metricRegistry) { @Override protected void configure() { bind(CassandraFactory.class).asEagerSingleton(); + bind(StatsDClient.class).asEagerSingleton(); // Event Store bind(ChannelConfiguration.class).to(QueueChannelConfiguration.class).asEagerSingleton(); diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java index fda127a19..5c5faffeb 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java @@ -60,6 +60,7 @@ import com.bazaarvoice.emodb.sor.db.cql.CqlForMultiGets; import com.bazaarvoice.emodb.sor.db.cql.CqlForScans; import com.bazaarvoice.emodb.sor.db.cql.SorCqlSettingsTask; +import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService; import com.bazaarvoice.emodb.sor.log.LogbackSlowQueryLogProvider; import com.bazaarvoice.emodb.sor.log.SlowQueryLog; import com.bazaarvoice.emodb.sor.log.SlowQueryLogConfiguration; @@ -195,6 +196,7 @@ protected void configure() { bind(SlowQueryLog.class).toProvider(LogbackSlowQueryLogProvider.class); bind(HintsConsistencyTimeProvider.class).asEagerSingleton(); bind(MinLagConsistencyTimeProvider.class).asEagerSingleton(); + bind(KafkaProducerService.class); // The web servers are responsible for updating the ZooKeeper full consistency data. CLI tools don't need to. // Enable updating the ZooKeeper full consistency data if specified diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java deleted file mode 100644 index 8bdccc291..000000000 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaAdminService.java +++ /dev/null @@ -1,118 +0,0 @@ -package com.bazaarvoice.emodb.sor.kafka; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -public class KafkaAdminService { - private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class); - private final AdminClient adminClient; - // Cache for the list of all topics with a TTL of 10 minutes - private final Cache> topicListCache = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) - .build(); - - private static final String TOPIC_LIST_KEY = "allTopics"; - - - public KafkaAdminService() { - this.adminClient = AdminClient.create(KafkaConfig.getAdminProps()); - } - - /** - * Creates a new Kafka topic with the specified configurations. - * - * @param topic The name of the topic. - * @param numPartitions Number of partitions. - * @param replicationFactor Replication factor. - */ - public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) { - Boolean isExisting =isTopicExists(topic); - if (! isExisting) { - //create the topic now - NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); - try { - adminClient.createTopics(Collections.singleton(newTopic)).all().get(); - addToCache(topic); - _log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor); - } catch (Exception e) { - _log.error("Error creating topic {}: ", topic, e); - throw new RuntimeException(e); - } - } - return isExisting; - } - public void addToCache(String topic){ - Set topics = topicListCache.getIfPresent(TOPIC_LIST_KEY); - if (topics == null) { - topics = new HashSet<>(); - } else { - // Create a new mutable Set if the existing one is unmodifiable - topics = new HashSet<>(topics); - } - topics.add(topic); - topicListCache.put(TOPIC_LIST_KEY, topics); - _log.info("Added newly created topic to cache: {}", topic); - } - - - /** - * Checks if a Kafka topic exists by using a cache to store the list of all topics. - * If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list. - *

- * The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed - * from Kafka on the next access. - *

- * - * @param topic the name of the Kafka topic to check - * @return {@code true} if the topic exists, otherwise {@code false}. - * @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists. - */ - public boolean isTopicExists(String topic) { - try { - // Retrieve the list of topics from the cache - Set topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka); - - // Check if the given topic is in the cached list - return topics.contains(topic); - } catch (ExecutionException e) { - _log.error("Failed to check if topic exists: {}", topic, e); - throw new RuntimeException("Error checking if topic exists", e); - } - } - - /** - * Fetches the list of all topic names from Kafka AdminClient. - * This method is called only when the cache is expired or empty. - * - * @return a Set containing all topic names. - * @throws ExecutionException if there is an error fetching the topic list from Kafka. - */ - private Set fetchTopicListFromKafka() throws ExecutionException { - try { - _log.info("Fetching topic list from Kafka"); - return adminClient.listTopics().names().get(); - } catch (Exception e) { - _log.error("Error fetching topic list from Kafka", e); - throw new ExecutionException(e); - } - } - - /** - * Closes the AdminClient to release resources. - */ - public void close() { - if (adminClient != null) { - adminClient.close(); - } - } -} \ No newline at end of file diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java index 255bc53fb..be76309a2 100644 --- a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java +++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java @@ -134,15 +134,6 @@ public static Properties getProducerProps() { return producerProps; } - // Kafka Admin properties - public static Properties getAdminProps() { - Properties adminProps = new Properties(); - - adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); - logger.info("Kafka Admin properties initialized."); - return adminProps; - } - // Ensure the SSM client is closed when the application shuts down public static void shutdown() { if (ssmClient != null) {