Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Databus Throttling Changes #847

Merged
merged 27 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cce3d28
feat: execution name sanitization and unit test for stepfn
rishuyadavbv Oct 28, 2024
49cfe91
master_size api changes
nabajyotiDash-hub Nov 4, 2024
f082d99
Admin API changes (#845)
shan-1094 Nov 4, 2024
05fb5ed
PD-257044-changes-for-databus (#846)
anurag0510 Nov 4, 2024
62dc31e
added datadog metrics changes
anurag0510 Oct 26, 2024
72ed737
[PD-257044] databus changes rebase (#848)
anurag0510 Nov 5, 2024
0dad904
admin api and queue name split
anandujayan Nov 5, 2024
63d05f0
Pd 257044 databus changes rebase (#849)
anurag0510 Nov 5, 2024
be3df07
Revert "added datadog metrics changes"
nabajyotiDash-hub Nov 5, 2024
6b0b672
build fix
nabajyotiDash-hub Nov 5, 2024
6fab83e
endpoint change and databus changes
anandujayan Nov 5, 2024
ff905b0
updated updateRefInDatabus method
anurag0510 Nov 6, 2024
02c3a35
address review commnets
nabajyotiDash-hub Nov 6, 2024
7d71b48
branch admin -prepare release emodb-6.5.205
Nov 6, 2024
6193999
branch admin -prepare for next development iteration
Nov 6, 2024
05cfcbf
edit doc
nabajyotiDash-hub Nov 6, 2024
1861f8b
fix tests
nabajyotiDash-hub Nov 6, 2024
da60cdb
fix tests
nabajyotiDash-hub Nov 6, 2024
a40a48c
fix review comments
nabajyotiDash-hub Nov 6, 2024
ab2c2d1
removed ;
nabajyotiDash-hub Nov 6, 2024
6e1cf00
branch admin -prepare release emodb-6.5.206
Nov 6, 2024
3373932
upgrade pom version
nabajyotiDash-hub Nov 6, 2024
2941007
branch admin -prepare release emodb-6.5.208
Nov 6, 2024
54122d9
branch admin -prepare for next development iteration
Nov 6, 2024
173f50b
get or default change
anandujayan Nov 6, 2024
13847e8
sor get or default changes
anandujayan Nov 6, 2024
fcd125a
Unit test fixes
anandujayan Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Subscription getSubscription(String subscription)
*/
long getEventCountUpTo(String subscription, long limit);

default long getMasterCount(){
return 0;
};

/** Counts the number of events with outstanding claims that cause the events to be skipped by {@link #poll}. */
long getClaimCount(String subscription);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public long getEventCountUpTo(String subscription, long limit) {
return _ownerAwareDatabus.getEventCountUpTo(ownerId, subscription, limit);
}

@Override
public long getMasterCount() {
return _ownerAwareDatabus.getMasterEventCountUncached(ownerId);
}

@Override
public long getClaimCount(String subscription) {
return _ownerAwareDatabus.getClaimCount(ownerId, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,18 @@ public long getEventCount(String ownerId, String subscription) {
return getEventCountUpTo(ownerId, subscription, Long.MAX_VALUE);
}

@Override
public long getMasterEventCountUncached(String ownerId) {
_log.info("Inside getMasterEventCountUncached with _masterFanoutChannels length {}", _masterFanoutChannels.size());
long size = 0;
for(String channel : _masterFanoutChannels) {
size += _eventStore.getSizeEstimate(channel, Long.MAX_VALUE);
_log.info("From channel size {} {}:", channel, size);
}
return size;
}


@Override
public long getEventCountUpTo(String ownerId, String subscription, long limit) {
checkSubscriptionOwner(ownerId, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ Subscription getSubscription(String ownerId, String subscription)
long getEventCount(String ownerId, String subscription)
throws UnauthorizedSubscriptionException;

public long getMasterEventCountUncached(String ownerId)
throws UnauthorizedSubscriptionException;;

long getEventCountUpTo(String ownerId, String subscription, long limit)
throws UnauthorizedSubscriptionException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.bazaarvoice.emodb.sor.api.Intrinsic;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
Expand All @@ -22,6 +23,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -38,7 +40,7 @@ public class TableAuthIdentityManagerDAOTest {
*/
@Test
public void testRebuildIdIndex() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -76,7 +78,7 @@ public void testRebuildIdIndex() {

@Test
public void testGrandfatheredInId() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -128,7 +130,7 @@ public void testGrandfatheredInId() {

@Test
public void testIdAttributeCompatibility() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.bazaarvoice.emodb.sor.api.WriteConsistency;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.web.auth.EmoPermissionResolver;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
Expand All @@ -36,10 +37,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -63,7 +61,7 @@ public class TableRoleManagerDAOTest {
@BeforeMethod
public void setUp() {
// DataStore and PermissionManager are fairly heavy to fully mock. Use spies on in-memory implementations instead
_backendDataStore = new InMemoryDataStore(new MetricRegistry());
_backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
_dataStore = spy(_backendDataStore);
_permissionResolver = new EmoPermissionResolver(null, null);
_backendPermissionManager = new InMemoryPermissionManager(_permissionResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.bazaarvoice.emodb.sor.db.astyanax.DeltaPlacementFactory;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxTableDAO;
import com.bazaarvoice.emodb.table.db.astyanax.CQLStashTableDAO;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void setup() throws Exception {
_astyanaxTableDAO.setCQLStashTableDAO(cqlStashTableDAO);
// Don't store table definitions in the actual backing store so as not to interrupt other tests. Use a
// private in-memory implementation.
_tableBackingStore = new InMemoryDataStore(new MetricRegistry());
_tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
_astyanaxTableDAO.setBackingStore(_tableBackingStore);

_lifeCycleRegistry.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public interface BaseQueueService {
*/
long getMessageCount(String queue);

/**
* Get the queue size without caching
* <p/>
* returns long
*/
default long getUncachedSize(String queue){
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface DedupQueueService extends BaseQueueService {

void sendAll(Map<String, ? extends Collection<?>> messagesByQueue);

//Overloaded sendAll method to send to cassandra
void sendAll(String queue, Collection<?>messages, boolean isFlush);

/**
Expand All @@ -26,6 +27,10 @@ public interface DedupQueueService extends BaseQueueService {
*/
long getMessageCount(String queue);

default long getUncachedSize(String queue){
return 0;
}

/**
* Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the
* specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public interface QueueService extends BaseQueueService {

void sendAll(String queue, Collection<?> messages);


void sendAll(Map<String, ? extends Collection<?>> messagesByQueue);

//Overloaded sendAll method to send to cassandra
Expand All @@ -28,6 +27,15 @@ public interface QueueService extends BaseQueueService {
*/
long getMessageCount(String queue);

/**
* Get the queue size without caching
* <p/>
* returns long
*/
default long getUncachedSize(String queue){
return 0;
}

/**
* Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the
* specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,10 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q

String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval);

// Create the timestamp
String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds

queueName = stepFunctionService.sanitizeExecutionName(queueName);
// Check if queueType is "dedupq" and prepend "D" to execution name if true
String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;

String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;;
// Start the Step Function execution
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.bazaarvoice.emodb.queue.core.Entities;

public class ExecutionInputWrapper {
private QueueExecutionAttributes executionInput;

// Getter and Setter
public QueueExecutionAttributes getExecutionInput() {
return executionInput;
}

public void setExecutionInput(QueueExecutionAttributes executionInput) {
this.executionInput = executionInput;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.bazaarvoice.emodb.queue.core.Entities;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class QueueExecutionAttributes {

private String queueName;
private String queueType;
private String topicName;
private Integer queueThreshold;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an Object (Integer) and not a primitive 'integer'?
I suppose this should be a 'long', to support really long size value

private Integer batchSize;
private Integer interval;
private String status;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status should be a boolean or ENUM, random string will be confusing and maintenance hassle


public void setQueueName(String queueName) {
this.queueName = queueName;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public void setQueueType(String queueType) {
this.queueType = queueType;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public void setQueueThreshold(int queueThreshold) {
this.queueThreshold = queueThreshold;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public void setInterval(int interval) {
this.interval = interval;
}

public Integer getQueueThreshold() {
return queueThreshold;
}

public Integer getBatchSize() {
return batchSize;
}

public Integer getInterval() {
return interval;
}

public String getQueueName() {
return queueName;
}

public String getQueueType() {
return queueType;
}

public String getTopicName() {
return topicName;
}

public String getJsonPayload(QueueExecutionAttributes attributes) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(attributes);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof QueueExecutionAttributes)) return false;
QueueExecutionAttributes that = (QueueExecutionAttributes) o;
return Objects.equals(getQueueName(), that.getQueueName()) && Objects.equals(getQueueType(), that.getQueueType()) && Objects.equals(getTopicName(), that.getTopicName()) && Objects.equals(getQueueThreshold(), that.getQueueThreshold()) && Objects.equals(getBatchSize(), that.getBatchSize()) && Objects.equals(getInterval(), that.getInterval());
}

@Override
public int hashCode() {
return Objects.hash(getQueueName(), getQueueType(), getTopicName(), getQueueThreshold(), getBatchSize(), getInterval());
}

@Override
public String toString() {
return "QueueExecutionAttributes{" +
"queueName='" + queueName + '\'' +
", queueType='" + queueType + '\'' +
", topicName='" + topicName + '\'' +
", queueThreshold=" + queueThreshold +
", batchSize=" + batchSize +
", interval=" + interval +
", status='" + status + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void sendMessages(String topic, Collection<String> events, String queueTy
throw new RuntimeException("Error sending messages to Kafka", e);
}
}
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis());
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do log the events.size(O as well in this log, will be easy for analytics, on time taken per topic for the events size...

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest;
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult;
import com.amazonaws.services.simplesystemsmanagement.model.PutParameterRequest;
import com.amazonaws.services.simplesystemsmanagement.model.PutParameterResult;
import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException;
Expand Down Expand Up @@ -104,4 +106,22 @@ public Map<String, String> getParameters(List<String> parameterNames) {
}
}

public Long updateParameter(String key, String value) {
try {
if (key == null || key.trim().isEmpty()) {
logger.error("parameter name cannot be null or blank");
throw new IllegalArgumentException("parameter name cannot be null or blank");
}

PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true);

PutParameterResult response = ssmClient.putParameter(request);
logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion());
return response.getVersion();
} catch (Exception e) {
logger.error("Failed to update parameter: " + key + " with value: " + value, e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this log required? adding too much log events... anyway the thrown exception will be logged by the caller/catcher...

throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why its a RuntimeException and not an Exception?

}
}

}
Loading