|
34 | 34 | import org.opensearch.action.search.SearchResponse;
|
35 | 35 | import org.opensearch.action.search.SearchType;
|
36 | 36 | import org.opensearch.action.support.WriteRequest;
|
| 37 | +import org.opensearch.action.support.replication.TransportReplicationAction; |
37 | 38 | import org.opensearch.action.termvectors.TermVectorsRequestBuilder;
|
38 | 39 | import org.opensearch.action.termvectors.TermVectorsResponse;
|
39 | 40 | import org.opensearch.action.update.UpdateResponse;
|
|
53 | 54 | import org.opensearch.common.util.set.Sets;
|
54 | 55 | import org.opensearch.core.common.unit.ByteSizeUnit;
|
55 | 56 | import org.opensearch.core.common.unit.ByteSizeValue;
|
| 57 | +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; |
56 | 58 | import org.opensearch.core.index.shard.ShardId;
|
57 | 59 | import org.opensearch.core.xcontent.XContentBuilder;
|
58 | 60 | import org.opensearch.index.IndexModule;
|
| 61 | +import org.opensearch.index.IndexSettings; |
59 | 62 | import org.opensearch.index.ReplicationStats;
|
60 | 63 | import org.opensearch.index.SegmentReplicationPerGroupStats;
|
61 | 64 | import org.opensearch.index.SegmentReplicationPressureService;
|
|
65 | 68 | import org.opensearch.index.shard.IndexShard;
|
66 | 69 | import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
|
67 | 70 | import org.opensearch.index.store.remote.filecache.FileCache;
|
| 71 | +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; |
68 | 72 | import org.opensearch.indices.replication.common.ReplicationType;
|
69 | 73 | import org.opensearch.node.Node;
|
70 | 74 | import org.opensearch.search.sort.SortOrder;
|
|
73 | 77 | import org.opensearch.test.OpenSearchIntegTestCase;
|
74 | 78 | import org.opensearch.test.junit.annotations.TestLogging;
|
75 | 79 | import org.opensearch.test.transport.MockTransportService;
|
| 80 | +import org.opensearch.transport.RemoteTransportException; |
76 | 81 | import org.opensearch.transport.TransportService;
|
77 | 82 | import org.opensearch.transport.client.Requests;
|
78 | 83 | import org.junit.After;
|
|
89 | 94 | import java.util.Set;
|
90 | 95 | import java.util.concurrent.CountDownLatch;
|
91 | 96 | import java.util.concurrent.TimeUnit;
|
| 97 | +import java.util.concurrent.atomic.AtomicBoolean; |
92 | 98 | import java.util.concurrent.atomic.AtomicReference;
|
93 | 99 | import java.util.stream.Collectors;
|
94 | 100 |
|
@@ -175,6 +181,54 @@ public void teardown() throws Exception {
|
175 | 181 | clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
|
176 | 182 | }
|
177 | 183 |
|
| 184 | + public void testPublishCheckPointFail() throws Exception { |
| 185 | + Settings mockNodeSetting = Settings.builder() |
| 186 | + .put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0)) |
| 187 | + .build(); |
| 188 | + |
| 189 | + final String primaryNode = internalCluster().startDataOnlyNode(mockNodeSetting); |
| 190 | + createIndex(INDEX_NAME); |
| 191 | + ensureYellowAndNoInitializingShards(INDEX_NAME); |
| 192 | + final String replicaNode = internalCluster().startDataOnlyNode(mockNodeSetting); |
| 193 | + ensureGreen(INDEX_NAME); |
| 194 | + |
| 195 | + MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance( |
| 196 | + TransportService.class, |
| 197 | + replicaNode |
| 198 | + )); |
| 199 | + AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true); |
| 200 | + replicaTransportService.addRequestHandlingBehavior( |
| 201 | + PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, |
| 202 | + (handler, request, channel, task) -> { |
| 203 | + if (mockReplicaReceivePublishCheckpointException.get()) { |
| 204 | + logger.info("mock remote transport exception"); |
| 205 | + throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); |
| 206 | + } |
| 207 | + logger.info("replica receive publish checkpoint request"); |
| 208 | + handler.messageReceived(request, channel, task); |
| 209 | + } |
| 210 | + ); |
| 211 | + |
| 212 | + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); |
| 213 | + logger.info("ensure publish checkpoint request can be process"); |
| 214 | + Thread.sleep(3000); |
| 215 | + mockReplicaReceivePublishCheckpointException.set(false); |
| 216 | + |
| 217 | + assertAcked( |
| 218 | + client().admin() |
| 219 | + .indices() |
| 220 | + .prepareUpdateSettings(INDEX_NAME) |
| 221 | + .setSettings( |
| 222 | + Settings.builder() |
| 223 | + .put(IndexSettings.INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) |
| 224 | + .put(IndexSettings.INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING.getKey(), TimeValue.timeValueSeconds(1)) |
| 225 | + ) |
| 226 | + ); |
| 227 | + |
| 228 | + waitForSearchableDocs(1, primaryNode, replicaNode); |
| 229 | + replicaTransportService.clearAllRules(); |
| 230 | + } |
| 231 | + |
178 | 232 | @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/17526")
|
179 | 233 | public void testRestartPrimary_NoReplicas() throws Exception {
|
180 | 234 | final String primary = internalCluster().startDataAndWarmNodes(1).get(0);
|
|
0 commit comments