Skip to content

Commit 840f87c

Browse files
[fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
1 parent 4e96e7f commit 840f87c

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.client.api.PulsarClientException;
3030
import org.apache.pulsar.client.api.Schema;
3131
import org.apache.pulsar.client.api.TableView;
32+
import org.apache.pulsar.common.util.FutureUtil;
3233

3334
/**
3435
* The load data store, base on {@link TableView <T>}.
@@ -58,54 +59,60 @@ public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class<T> cl
5859
}
5960

6061
@Override
61-
public CompletableFuture<Void> pushAsync(String key, T loadData) {
62+
public synchronized CompletableFuture<Void> pushAsync(String key, T loadData) {
63+
if (producer == null) {
64+
return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
65+
}
6266
return producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
6367
}
6468

6569
@Override
66-
public CompletableFuture<Void> removeAsync(String key) {
70+
public synchronized CompletableFuture<Void> removeAsync(String key) {
71+
if (producer == null) {
72+
return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
73+
}
6774
return producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
6875
}
6976

7077
@Override
71-
public Optional<T> get(String key) {
78+
public synchronized Optional<T> get(String key) {
7279
validateTableViewStart();
7380
return Optional.ofNullable(tableView.get(key));
7481
}
7582

7683
@Override
77-
public void forEach(BiConsumer<String, T> action) {
84+
public synchronized void forEach(BiConsumer<String, T> action) {
7885
validateTableViewStart();
7986
tableView.forEach(action);
8087
}
8188

82-
public Set<Map.Entry<String, T>> entrySet() {
89+
public synchronized Set<Map.Entry<String, T>> entrySet() {
8390
validateTableViewStart();
8491
return tableView.entrySet();
8592
}
8693

8794
@Override
88-
public int size() {
95+
public synchronized int size() {
8996
validateTableViewStart();
9097
return tableView.size();
9198
}
9299

93100
@Override
94-
public void closeTableView() throws IOException {
101+
public synchronized void closeTableView() throws IOException {
95102
if (tableView != null) {
96103
tableView.close();
97104
tableView = null;
98105
}
99106
}
100107

101108
@Override
102-
public void start() throws LoadDataStoreException {
109+
public synchronized void start() throws LoadDataStoreException {
103110
startProducer();
104111
startTableView();
105112
}
106113

107114
@Override
108-
public void startTableView() throws LoadDataStoreException {
115+
public synchronized void startTableView() throws LoadDataStoreException {
109116
if (tableView == null) {
110117
try {
111118
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
@@ -117,7 +124,7 @@ public void startTableView() throws LoadDataStoreException {
117124
}
118125

119126
@Override
120-
public void startProducer() throws LoadDataStoreException {
127+
public synchronized void startProducer() throws LoadDataStoreException {
121128
if (producer == null) {
122129
try {
123130
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
@@ -129,7 +136,7 @@ public void startProducer() throws LoadDataStoreException {
129136
}
130137

131138
@Override
132-
public void close() throws IOException {
139+
public synchronized void close() throws IOException {
133140
if (producer != null) {
134141
producer.close();
135142
producer = null;
@@ -138,15 +145,14 @@ public void close() throws IOException {
138145
}
139146

140147
@Override
141-
public void init() throws IOException {
148+
public synchronized void init() throws IOException {
142149
close();
143150
start();
144151
}
145152

146-
private void validateTableViewStart() {
153+
private synchronized void validateTableViewStart() {
147154
if (tableView == null) {
148155
throw new IllegalStateException("table view has not been started");
149156
}
150157
}
151-
152158
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.fail;
2425
import static org.testng.AssertJUnit.assertTrue;
2526

2627
import com.google.common.collect.Sets;
@@ -39,6 +40,7 @@
3940
import java.util.HashMap;
4041
import java.util.Map;
4142
import java.util.UUID;
43+
import java.util.concurrent.ExecutionException;
4244

4345
@Test(groups = "broker")
4446
public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@@ -154,4 +156,32 @@ public void testTableViewRestart() throws Exception {
154156
Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 2));
155157
}
156158

159+
@Test
160+
public void testProducerStop() throws Exception {
161+
String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
162+
LoadDataStore<Integer> loadDataStore =
163+
LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
164+
loadDataStore.startProducer();
165+
loadDataStore.pushAsync("1", 1).get();
166+
loadDataStore.removeAsync("1").get();
167+
168+
loadDataStore.close();
169+
170+
try {
171+
loadDataStore.pushAsync("2", 2).get();
172+
fail();
173+
} catch (ExecutionException ex) {
174+
assertTrue(ex.getCause() instanceof IllegalStateException);
175+
}
176+
try {
177+
loadDataStore.removeAsync("2").get();
178+
fail();
179+
} catch (ExecutionException ex) {
180+
assertTrue(ex.getCause() instanceof IllegalStateException);
181+
}
182+
loadDataStore.startProducer();
183+
loadDataStore.pushAsync("3", 3).get();
184+
loadDataStore.removeAsync("3").get();
185+
}
186+
157187
}

0 commit comments

Comments
 (0)