diff --git a/queue/pom.xml b/queue/pom.xml
index 2725dd86f..973cc8919 100644
--- a/queue/pom.xml
+++ b/queue/pom.xml
@@ -135,5 +135,10 @@
com.amazonaws
aws-java-sdk-ssm
+
+ com.datadoghq
+ java-dogstatsd-client
+ 2.3
+
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
index e7a8e7a13..b0f8593e1 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java
@@ -44,6 +44,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import com.timgroup.statsd.StatsDClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,12 +63,13 @@ abstract class AbstractQueueService implements BaseQueueService {
public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024;
private final StepFunctionService stepFunctionService;
private final ParameterStoreUtil parameterStoreUtil;
+ private final StatsDClient _statsDClient;
private static final String UNIVERSE = KafkaConfig.getUniverseFromEnv();
protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
JobHandlerRegistry jobHandlerRegistry,
JobType moveQueueJobType,
- Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
_eventStore = eventStore;
_jobService = jobService;
_moveQueueJobType = moveQueueJobType;
@@ -75,6 +77,7 @@ protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
this.producerService = producerService;
this.stepFunctionService = stepFunctionService;
this.parameterStoreUtil = new ParameterStoreUtil();
+ _statsDClient = statsDClient;
registerMoveQueueJobHandler(jobHandlerRegistry);
@@ -192,6 +195,7 @@ public void sendAll(Map> messagesByQueue) {
validateMessage(message);
events.add(message.toString());
}
+ _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
}
@@ -229,6 +233,7 @@ public void sendAll(String queue, Collection> messages, boolean fromKafka) {
"Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
events.add(messageByteBuffer);
}
+ _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
Multimap eventsByChannel = builder.build();
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
index 9ec8d3660..f3820f378 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
@@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
+import com.timgroup.statsd.StatsDClient;
import java.time.Clock;
public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService {
@Inject
public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
- super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService );
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService, statsDClient );
}
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
index 524ca5003..8a8a72ad8 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
@@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
+import com.timgroup.statsd.StatsDClient;
import java.time.Clock;
public class DefaultQueueService extends AbstractQueueService implements QueueService {
@Inject
public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
- super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService);
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService, statsDClient);
}
}
diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
index 0ac9a0ed8..36d4232d4 100644
--- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
+++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
@@ -7,6 +7,7 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
+import com.timgroup.statsd.StatsDClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;
@@ -41,7 +42,7 @@ public void testSizeCache() {
BaseEventStore mockEventStore = mock(BaseEventStore.class);
AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class),
- mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){};
+ mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class), mock(StatsDClient.class)){};
// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000