|
35 | 35 | import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
|
36 | 36 | import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
|
37 | 37 | import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
|
38 |
| -import static org.hamcrest.MatcherAssert.assertThat; |
39 |
| -import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
40 |
| -import static org.hamcrest.Matchers.lessThanOrEqualTo; |
| 38 | +import static org.assertj.core.api.Assertions.assertThat; |
41 | 39 | import static org.mockito.ArgumentMatchers.any;
|
42 | 40 | import static org.mockito.ArgumentMatchers.anyLong;
|
43 | 41 | import static org.mockito.ArgumentMatchers.eq;
|
@@ -258,10 +256,18 @@ public void channelOwnerTest() throws Exception {
|
258 | 256 | assertEquals(channelOwner1, channelOwner2);
|
259 | 257 | LeaderElectionService leaderElectionService1 = (LeaderElectionService) FieldUtils.readDeclaredField(
|
260 | 258 | channel1, "leaderElectionService", true);
|
261 |
| - leaderElectionService1.close(); |
262 |
| - waitUntilNewChannelOwner(channel2, channelOwner1); |
263 |
| - leaderElectionService1.start(); |
264 |
| - waitUntilNewChannelOwner(channel1, channelOwner1); |
| 259 | + |
| 260 | + // delete the leader node to trigger a new leader election |
| 261 | + pulsar1.getLocalMetadataStore().delete("/loadbalance/leader", Optional.of(-1L)).get(); |
| 262 | + |
| 263 | + // wait for leader to lose leadership |
| 264 | + Thread.sleep(500); |
| 265 | + |
| 266 | + // wait for leader election to happen |
| 267 | + Awaitility.await().untilAsserted(() -> { |
| 268 | + assertThat(channel1.getChannelOwnerAsync().get()).isPresent(); |
| 269 | + assertThat(channel2.getChannelOwnerAsync().get()).isPresent(); |
| 270 | + }); |
265 | 271 |
|
266 | 272 | var newChannelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);
|
267 | 273 | var newChannelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);
|
@@ -698,35 +704,31 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException {
|
698 | 704 | var lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);
|
699 | 705 |
|
700 | 706 | assertEquals(SessionReestablished, lastMetadataSessionEvent);
|
701 |
| - assertThat(lastMetadataSessionEventTimestamp, |
702 |
| - greaterThanOrEqualTo(ts)); |
| 707 | + assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts); |
703 | 708 |
|
704 | 709 | ts = System.currentTimeMillis();
|
705 | 710 | channel1.handleMetadataSessionEvent(SessionLost);
|
706 | 711 | lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
|
707 | 712 | lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);
|
708 | 713 |
|
709 | 714 | assertEquals(SessionLost, lastMetadataSessionEvent);
|
710 |
| - assertThat(lastMetadataSessionEventTimestamp, |
711 |
| - greaterThanOrEqualTo(ts)); |
| 715 | + assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts); |
712 | 716 |
|
713 | 717 | ts = System.currentTimeMillis();
|
714 | 718 | channel1.handleMetadataSessionEvent(ConnectionLost);
|
715 | 719 | lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
|
716 | 720 | lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);
|
717 | 721 |
|
718 | 722 | assertEquals(SessionLost, lastMetadataSessionEvent);
|
719 |
| - assertThat(lastMetadataSessionEventTimestamp, |
720 |
| - lessThanOrEqualTo(ts)); |
| 723 | + assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts); |
721 | 724 |
|
722 | 725 | ts = System.currentTimeMillis();
|
723 | 726 | channel1.handleMetadataSessionEvent(Reconnected);
|
724 | 727 | lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
|
725 | 728 | lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);
|
726 | 729 |
|
727 | 730 | assertEquals(SessionLost, lastMetadataSessionEvent);
|
728 |
| - assertThat(lastMetadataSessionEventTimestamp, |
729 |
| - lessThanOrEqualTo(ts)); |
| 731 | + assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts); |
730 | 732 |
|
731 | 733 | }
|
732 | 734 |
|
|
0 commit comments