Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

[PD-258160] Datadog Metrics Addition for Queue name and size #836

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions queue/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ssm</artifactId>
</dependency>
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>java-dogstatsd-client</artifactId>
<version>2.3</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.timgroup.statsd.StatsDClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,19 +63,21 @@ abstract class AbstractQueueService implements BaseQueueService {
public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024;
private final StepFunctionService stepFunctionService;
private final ParameterStoreUtil parameterStoreUtil;
private final StatsDClient _statsDClient;

private static final String UNIVERSE = KafkaConfig.getUniverseFromEnv();
protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
JobHandlerRegistry jobHandlerRegistry,
JobType<MoveQueueRequest, MoveQueueResult> moveQueueJobType,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
_eventStore = eventStore;
_jobService = jobService;
_moveQueueJobType = moveQueueJobType;
this.adminService = adminService;
this.producerService = producerService;
this.stepFunctionService = stepFunctionService;
this.parameterStoreUtil = new ParameterStoreUtil();
_statsDClient = statsDClient;


registerMoveQueueJobHandler(jobHandlerRegistry);
Expand Down Expand Up @@ -192,6 +195,7 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
validateMessage(message);
events.add(message.toString());
}
_statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
}

Expand Down Expand Up @@ -229,6 +233,7 @@ public void sendAll(String queue, Collection<?> messages, boolean fromKafka) {
"Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
events.add(messageByteBuffer);
}
_statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
Multimap<String, ByteBuffer> eventsByChannel = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import com.timgroup.statsd.StatsDClient;

import java.time.Clock;

public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService {
@Inject
public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService );
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService, statsDClient );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import com.timgroup.statsd.StatsDClient;

import java.time.Clock;

public class DefaultQueueService extends AbstractQueueService implements QueueService {
@Inject
public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService);
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService, statsDClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.timgroup.statsd.StatsDClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -41,7 +42,7 @@ public void testSizeCache() {

BaseEventStore mockEventStore = mock(BaseEventStore.class);
AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class),
mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){};
mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class), mock(StatsDClient.class)){};

// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000
Expand Down