-
Notifications
You must be signed in to change notification settings - Fork 45
Databus Throttling Changes #847
Changes from 13 commits
cce3d28
49cfe91
f082d99
05fb5ed
62dc31e
72ed737
0dad904
63d05f0
be3df07
6b0b672
6fab83e
ff905b0
02c3a35
7d71b48
6193999
05cfcbf
1861f8b
da60cdb
a40a48c
ab2c2d1
6e1cf00
3373932
2941007
54122d9
173f50b
13847e8
fcd125a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.bazaarvoice.emodb.queue.core.Entities; | ||
|
||
public class ExecutionInputWrapper { | ||
private QueueExecutionAttributes executionInput; | ||
|
||
// Getter and Setter | ||
public QueueExecutionAttributes getExecutionInput() { | ||
return executionInput; | ||
} | ||
|
||
public void setExecutionInput(QueueExecutionAttributes executionInput) { | ||
this.executionInput = executionInput; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package com.bazaarvoice.emodb.queue.core.Entities; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
public class QueueExecutionAttributes { | ||
|
||
private String queueName; | ||
private String queueType; | ||
private String topicName; | ||
private Integer queueThreshold; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this an Object (Integer) and not a primitive 'integer'? |
||
private Integer batchSize; | ||
private Integer interval; | ||
private String status; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Status should be a boolean or ENUM, random string will be confusing and maintenance hassle |
||
|
||
public void setQueueName(String queueName) { | ||
this.queueName = queueName; | ||
} | ||
|
||
public String getStatus() { | ||
return status; | ||
} | ||
|
||
public void setStatus(String status) { | ||
this.status = status; | ||
} | ||
|
||
public void setQueueType(String queueType) { | ||
this.queueType = queueType; | ||
} | ||
|
||
public void setTopicName(String topicName) { | ||
this.topicName = topicName; | ||
} | ||
|
||
public void setQueueThreshold(int queueThreshold) { | ||
this.queueThreshold = queueThreshold; | ||
} | ||
|
||
public void setBatchSize(int batchSize) { | ||
this.batchSize = batchSize; | ||
} | ||
|
||
public void setInterval(int interval) { | ||
this.interval = interval; | ||
} | ||
|
||
public Integer getQueueThreshold() { | ||
return queueThreshold; | ||
} | ||
|
||
public Integer getBatchSize() { | ||
return batchSize; | ||
} | ||
|
||
public Integer getInterval() { | ||
return interval; | ||
} | ||
|
||
public String getQueueName() { | ||
return queueName; | ||
} | ||
|
||
public String getQueueType() { | ||
return queueType; | ||
} | ||
|
||
public String getTopicName() { | ||
return topicName; | ||
} | ||
|
||
public String getJsonPayload(QueueExecutionAttributes attributes) throws JsonProcessingException { | ||
ObjectMapper mapper = new ObjectMapper(); | ||
return mapper.writeValueAsString(attributes); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (!(o instanceof QueueExecutionAttributes)) return false; | ||
QueueExecutionAttributes that = (QueueExecutionAttributes) o; | ||
return Objects.equals(getQueueName(), that.getQueueName()) && Objects.equals(getQueueType(), that.getQueueType()) && Objects.equals(getTopicName(), that.getTopicName()) && Objects.equals(getQueueThreshold(), that.getQueueThreshold()) && Objects.equals(getBatchSize(), that.getBatchSize()) && Objects.equals(getInterval(), that.getInterval()); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(getQueueName(), getQueueType(), getTopicName(), getQueueThreshold(), getBatchSize(), getInterval()); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "QueueExecutionAttributes{" + | ||
"queueName='" + queueName + '\'' + | ||
", queueType='" + queueType + '\'' + | ||
", topicName='" + topicName + '\'' + | ||
", queueThreshold=" + queueThreshold + | ||
", batchSize=" + batchSize + | ||
", interval=" + interval + | ||
", status='" + status + '\'' + | ||
'}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,7 +46,7 @@ public void sendMessages(String topic, Collection<String> events, String queueTy | |
throw new RuntimeException("Error sending messages to Kafka", e); | ||
} | ||
} | ||
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis()); | ||
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do log the events.size(O as well in this log, will be easy for analytics, on time taken per topic for the events size... |
||
} | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; | ||
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest; | ||
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult; | ||
import com.amazonaws.services.simplesystemsmanagement.model.PutParameterRequest; | ||
import com.amazonaws.services.simplesystemsmanagement.model.PutParameterResult; | ||
import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; | ||
import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; | ||
import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException; | ||
|
@@ -104,4 +106,22 @@ public Map<String, String> getParameters(List<String> parameterNames) { | |
} | ||
} | ||
|
||
public Long updateParameter(String key, String value) { | ||
try { | ||
if (key == null || key.trim().isEmpty()) { | ||
logger.error("parameter name cannot be null or blank"); | ||
throw new IllegalArgumentException("parameter name cannot be null or blank"); | ||
} | ||
|
||
PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true); | ||
|
||
PutParameterResult response = ssmClient.putParameter(request); | ||
logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion()); | ||
return response.getVersion(); | ||
} catch (Exception e) { | ||
logger.error("Failed to update parameter: " + key + " with value: " + value, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this log required? adding too much log events... anyway the thrown exception will be logged by the caller/catcher... |
||
throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why its a RuntimeException and not an Exception? |
||
} | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.