8
8
9
9
package org .opensearch .remotemigration ;
10
10
11
- import com .carrotsearch .randomizedtesting .generators .RandomNumbers ;
12
-
13
- import org .opensearch .action .DocWriteResponse ;
14
11
import org .opensearch .action .admin .cluster .health .ClusterHealthResponse ;
15
12
import org .opensearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
13
+ import org .opensearch .action .admin .indices .replication .SegmentReplicationStatsResponse ;
16
14
import org .opensearch .action .admin .indices .settings .put .UpdateSettingsRequest ;
17
- import org .opensearch .action .delete .DeleteResponse ;
18
- import org .opensearch .action .index .IndexResponse ;
19
15
import org .opensearch .cluster .metadata .IndexMetadata ;
20
16
import org .opensearch .cluster .routing .allocation .command .MoveAllocationCommand ;
21
17
import org .opensearch .common .Priority ;
22
18
import org .opensearch .common .settings .Settings ;
23
19
import org .opensearch .common .unit .TimeValue ;
20
+ import org .opensearch .index .SegmentReplicationPerGroupStats ;
24
21
import org .opensearch .index .query .QueryBuilders ;
25
22
import org .opensearch .test .OpenSearchIntegTestCase ;
26
23
import org .opensearch .test .hamcrest .OpenSearchAssertions ;
27
24
28
- import java .util .concurrent .atomic .AtomicBoolean ;
29
- import java .util .concurrent .atomic .AtomicInteger ;
25
+ import java .util .concurrent .TimeUnit ;
30
26
31
27
import static org .opensearch .node .remotestore .RemoteStoreNodeService .MIGRATION_DIRECTION_SETTING ;
32
28
import static org .opensearch .node .remotestore .RemoteStoreNodeService .REMOTE_STORE_COMPATIBILITY_MODE_SETTING ;
33
29
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
34
30
35
31
@ OpenSearchIntegTestCase .ClusterScope (scope = OpenSearchIntegTestCase .Scope .TEST , numDataNodes = 0 , autoManageMasterNodes = false )
36
-
37
32
public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {
38
33
39
34
protected int maximumNumberOfShards () {
@@ -63,10 +58,8 @@ public void testReplicaRecovery() throws Exception {
63
58
client ().admin ().indices ().prepareCreate ("test" ).setSettings (indexSettings ()).setMapping ("field" , "type=text" ).get ();
64
59
String replicaNode = internalCluster ().startNode ();
65
60
ensureGreen ("test" );
66
-
67
- AtomicInteger numAutoGenDocs = new AtomicInteger ();
68
- final AtomicBoolean finished = new AtomicBoolean (false );
69
- Thread indexingThread = getThread (finished , numAutoGenDocs );
61
+ AsyncIndexingService asyncIndexingService = new AsyncIndexingService ("test" );
62
+ asyncIndexingService .startIndexing ();
70
63
71
64
refresh ("test" );
72
65
@@ -78,12 +71,10 @@ public void testReplicaRecovery() throws Exception {
78
71
updateSettingsRequest .persistentSettings (Settings .builder ().put (MIGRATION_DIRECTION_SETTING .getKey (), "remote_store" ));
79
72
assertAcked (client ().admin ().cluster ().updateSettings (updateSettingsRequest ).actionGet ());
80
73
81
- String remoteNode2 = internalCluster ().startNode ();
74
+ internalCluster ().startNode ();
82
75
internalCluster ().validateClusterFormed ();
83
76
84
77
// identify the primary
85
-
86
- Thread .sleep (RandomNumbers .randomIntBetween (random (), 0 , 2000 ));
87
78
logger .info ("--> relocating primary from {} to {} " , primaryNode , remoteNode );
88
79
client ().admin ()
89
80
.cluster ()
@@ -102,7 +93,6 @@ public void testReplicaRecovery() throws Exception {
102
93
103
94
assertEquals (0 , clusterHealthResponse .getRelocatingShards ());
104
95
logger .info ("--> relocation of primary from docrep to remote complete" );
105
- Thread .sleep (RandomNumbers .randomIntBetween (random (), 0 , 2000 ));
106
96
107
97
logger .info ("--> getting up the new replicas now to doc rep node as well as remote node " );
108
98
// Increase replica count to 3
@@ -129,52 +119,33 @@ public void testReplicaRecovery() throws Exception {
129
119
logger .info ("--> replica is up now on another docrep now as well as remote node" );
130
120
131
121
assertEquals (0 , clusterHealthResponse .getRelocatingShards ());
122
+ asyncIndexingService .stopIndexing ();
123
+ refresh ("test" );
132
124
133
- Thread .sleep (RandomNumbers .randomIntBetween (random (), 0 , 2000 ));
125
+ // segrep lag should be zero
126
+ assertBusy (() -> {
127
+ SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient ().admin ()
128
+ .indices ()
129
+ .prepareSegmentReplicationStats ("test" )
130
+ .setDetailed (true )
131
+ .execute ()
132
+ .actionGet ();
133
+ SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse .getReplicationStats ().get ("test" ).get (0 );
134
+ assertEquals (segmentReplicationStatsResponse .getReplicationStats ().size (), 1 );
135
+ perGroupStats .getReplicaStats ().stream ().forEach (e -> assertEquals (e .getCurrentReplicationLagMillis (), 0 ));
136
+ }, 20 , TimeUnit .SECONDS );
134
137
135
- // Stop replicas on docrep now.
136
- // ToDo : Remove once we have dual replication enabled
137
- client ().admin ()
138
- .indices ()
139
- .updateSettings (
140
- new UpdateSettingsRequest ("test" ).settings (
141
- Settings .builder ()
142
- .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 1 )
143
- .put ("index.routing.allocation.exclude._name" , primaryNode + "," + replicaNode )
144
- .build ()
145
- )
146
- )
147
- .get ();
148
-
149
- finished .set (true );
150
- indexingThread .join ();
151
- refresh ("test" );
152
- OpenSearchAssertions .assertHitCount (client ().prepareSearch ("test" ).setTrackTotalHits (true ).get (), numAutoGenDocs .get ());
138
+ OpenSearchAssertions .assertHitCount (
139
+ client ().prepareSearch ("test" ).setTrackTotalHits (true ).get (),
140
+ asyncIndexingService .getIndexedDocs ()
141
+ );
153
142
OpenSearchAssertions .assertHitCount (
154
143
client ().prepareSearch ("test" )
155
144
.setTrackTotalHits (true )// extra paranoia ;)
156
145
.setQuery (QueryBuilders .termQuery ("auto" , true ))
157
- // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2))
158
146
.get (),
159
- numAutoGenDocs . get ()
147
+ asyncIndexingService . getIndexedDocs ()
160
148
);
161
149
162
150
}
163
-
164
- private Thread getThread (AtomicBoolean finished , AtomicInteger numAutoGenDocs ) {
165
- Thread indexingThread = new Thread (() -> {
166
- while (finished .get () == false && numAutoGenDocs .get () < 100 ) {
167
- IndexResponse indexResponse = client ().prepareIndex ("test" ).setId ("id" ).setSource ("field" , "value" ).get ();
168
- assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
169
- DeleteResponse deleteResponse = client ().prepareDelete ("test" , "id" ).get ();
170
- assertEquals (DocWriteResponse .Result .DELETED , deleteResponse .getResult ());
171
- client ().prepareIndex ("test" ).setSource ("auto" , true ).get ();
172
- numAutoGenDocs .incrementAndGet ();
173
- logger .info ("Indexed {} docs here" , numAutoGenDocs .get ());
174
- }
175
- });
176
- indexingThread .start ();
177
- return indexingThread ;
178
- }
179
-
180
151
}
0 commit comments