diff --git a/queue/pom.xml b/queue/pom.xml index 2725dd86f..973cc8919 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -135,5 +135,10 @@ com.amazonaws aws-java-sdk-ssm + + com.datadoghq + java-dogstatsd-client + 2.3 + diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index e7a8e7a13..b0f8593e1 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -44,6 +44,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import com.timgroup.statsd.StatsDClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,12 +63,13 @@ abstract class AbstractQueueService implements BaseQueueService { public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024; private final StepFunctionService stepFunctionService; private final ParameterStoreUtil parameterStoreUtil; + private final StatsDClient _statsDClient; private static final String UNIVERSE = KafkaConfig.getUniverseFromEnv(); protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, JobType moveQueueJobType, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { _eventStore = eventStore; _jobService = jobService; _moveQueueJobType = moveQueueJobType; @@ -75,6 +77,7 @@ protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, this.producerService = producerService; this.stepFunctionService = stepFunctionService; this.parameterStoreUtil = new ParameterStoreUtil(); + _statsDClient = statsDClient; registerMoveQueueJobHandler(jobHandlerRegistry); @@ -192,6 +195,7 @@ public void sendAll(Map> messagesByQueue) { validateMessage(message); events.add(message.toString()); } + _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue); builder.putAll(queue, events); } @@ -229,6 +233,7 @@ public void sendAll(String queue, Collection messages, boolean fromKafka) { "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); events.add(messageByteBuffer); } + _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue); builder.putAll(queue, events); Multimap eventsByChannel = builder.build(); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java index 9ec8d3660..f3820f378 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java @@ -8,13 +8,14 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; +import com.timgroup.statsd.StatsDClient; import java.time.Clock; public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService { @Inject public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { - super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService ); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { + super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService, statsDClient ); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java index 524ca5003..8a8a72ad8 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java @@ -8,13 +8,14 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; +import com.timgroup.statsd.StatsDClient; import java.time.Clock; public class DefaultQueueService extends AbstractQueueService implements QueueService { @Inject public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { - super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { + super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService, statsDClient); } } diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java index 0ac9a0ed8..36d4232d4 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java @@ -7,6 +7,7 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; +import com.timgroup.statsd.StatsDClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.testng.annotations.Test; @@ -41,7 +42,7 @@ public void testSizeCache() { BaseEventStore mockEventStore = mock(BaseEventStore.class); AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class), - mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){}; + mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class), mock(StatsDClient.class)){}; // At limit=500, size estimate should be at 4800 // At limit=50, size estimate should be at 5000