Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

PD-257044-changes-for-databus #846

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.bazaarvoice.emodb.auth.apikey.ApiKey;
import com.bazaarvoice.emodb.auth.apikey.ApiKeyModification;
import com.bazaarvoice.emodb.auth.identity.TableAuthIdentityManagerDAO;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.AuditBuilder;
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
Expand All @@ -22,6 +24,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -38,7 +41,7 @@ public class TableAuthIdentityManagerDAOTest {
*/
@Test
public void testRebuildIdIndex() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -76,7 +79,7 @@ public void testRebuildIdIndex() {

@Test
public void testGrandfatheredInId() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -128,7 +131,7 @@ public void testGrandfatheredInId() {

@Test
public void testIdAttributeCompatibility() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry());
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.bazaarvoice.emodb.auth.role.RoleModification;
import com.bazaarvoice.emodb.auth.role.RoleNotFoundException;
import com.bazaarvoice.emodb.auth.role.TableRoleManagerDAO;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.Audit;
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
Expand Down Expand Up @@ -36,10 +38,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -63,7 +62,7 @@ public class TableRoleManagerDAOTest {
@BeforeMethod
public void setUp() {
// DataStore and PermissionManager are fairly heavy to fully mock. Use spies on in-memory implementations instead
_backendDataStore = new InMemoryDataStore(new MetricRegistry());
_backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
_dataStore = spy(_backendDataStore);
_permissionResolver = new EmoPermissionResolver(null, null);
_backendPermissionManager = new InMemoryPermissionManager(_permissionResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.Audit;
import com.bazaarvoice.emodb.sor.api.AuditBuilder;
import com.bazaarvoice.emodb.sor.condition.Conditions;
Expand Down Expand Up @@ -97,7 +99,7 @@ public void setup() throws Exception {
_astyanaxTableDAO.setCQLStashTableDAO(cqlStashTableDAO);
// Don't store table definitions in the actual backing store so as not to interrupt other tests. Use a
// private in-memory implementation.
_tableBackingStore = new InMemoryDataStore(new MetricRegistry());
_tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
_astyanaxTableDAO.setBackingStore(_tableBackingStore);

_lifeCycleRegistry.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public KafkaProducerService() {
* @param topic The Kafka topic.
* @param events The collection of messages to be sent.
*/
public void sendMessages(String topic, Collection<String> events, String queueType) {
public <T> void sendMessages(String topic, Collection<T> events, String queueType) {
LocalDateTime startTime = LocalDateTime.now();
_log.info("Sending {} messages to topic '{}'", events.size(), topic);
List<Future<RecordMetadata>> futures = new ArrayList<>();
// Use async sendMessage and collect futures
for (String event : events) {
futures.add(producer.send(new ProducerRecord<>(topic, event)));
for (T event : events) {
futures.add(producer.send(new ProducerRecord<>(topic, event.toString())));
}

// Wait for all futures to complete
Expand All @@ -46,7 +46,7 @@ public void sendMessages(String topic, Collection<String> events, String queueTy
throw new RuntimeException("Error sending messages to Kafka", e);
}
}
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis());
_log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,10 @@ void dropFacade(String table, String placement, Audit audit)
*/
URI getStashRoot()
throws StashNotAvailableException;

default void updateRefInDatabus(Iterable<UpdateRefModel> updateRefs, Set<String> tags, boolean isFacade) {
/*
* This method is a no-op in the default implementation. It is used by the Databus to update the reference
*/
}
}
22 changes: 22 additions & 0 deletions sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package com.bazaarvoice.emodb.sor.api;

import com.google.common.base.CharMatcher;

public abstract class Names {

/** Prevent instantiation. */
private Names() {}

private static final CharMatcher QUEUE_NAME_ALLOWED =
CharMatcher.inRange('a', 'z')
.or(CharMatcher.inRange('0', '9'))
.or(CharMatcher.anyOf("-.:@_"))
.precomputed();

/**
* Table names must be lowercase ASCII strings. between 1 and 255 characters in length. Whitespace, ISO control
* characters and certain punctuation characters that aren't generally allowed in file names or in elasticsearch
Expand All @@ -19,4 +27,18 @@ public static boolean isLegalTableAttributeName(String attributeName) {
// The attributes should not start with "~" which is reserved for Emodb's internal use
return !attributeName.startsWith("~");
}

/**
* Queue names must be lowercase ASCII strings. between 1 and 255 characters in length. Whitespace, ISO control
* characters and most punctuation characters aren't allowed. Queue names may not begin with a single underscore
* to allow URL space for extensions such as "/_queue/...". Queue names may not look like relative paths, ie.
* "." or "..".
*/
public static boolean isLegalQueueName(String queue) {
return queue != null &&
queue.length() > 0 && queue.length() <= 255 &&
!(queue.charAt(0) == '_' && !queue.startsWith("__")) &&
!(queue.charAt(0) == '.' && (".".equals(queue) || "..".equals(queue))) &&
QUEUE_NAME_ALLOWED.matchesAllOf(queue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.bazaarvoice.emodb.sor.api;

import java.util.Objects;
import java.util.Set;
import java.util.UUID;

import static java.util.Objects.hash;
import static java.util.Objects.requireNonNull;

/**
* Reference to a System of Record update.
*/
public final class UpdateRefModel {
public static final String TAGS_NAME = "~tags";
private final String _table;
private final String _key;
private final UUID _changeId;
private final Set<String> _tags;

public UpdateRefModel(String table, String key, UUID changeId, Set<String> tags) {
_table = requireNonNull(table, "table");
_key = requireNonNull(key, "key");
_changeId = requireNonNull(changeId, "changeId");
_tags = requireNonNull(tags, "tags");
}

public String getTable() {
return _table;
}

public String getKey() {
return _key;
}

public UUID getChangeId() {
return _changeId;
}

public Set<String> getTags() {
return _tags;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof UpdateRefModel)) {
return false;
}
UpdateRefModel that = (UpdateRefModel) o;
return Objects.equals(_table, that._table) &&
Objects.equals(_key, that._key) &&
Objects.equals(_changeId, that._changeId) &&
Objects.equals(_tags, that._tags);
}

@Override
public int hashCode() {
return hash(_table, _key, _changeId, _tags);
}

}
6 changes: 6 additions & 0 deletions sor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bazaarvoice.emodb</groupId>
<artifactId>emodb-queue</artifactId>
<version>6.5.205-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Loading