10
10
11
11
import org .apache .logging .log4j .message .ParameterizedMessage ;
12
12
import org .apache .lucene .backward_codecs .store .EndiannessReverserUtil ;
13
+ import org .apache .lucene .codecs .CodecUtil ;
13
14
import org .apache .lucene .store .AlreadyClosedException ;
14
15
import org .apache .lucene .store .ByteArrayDataOutput ;
15
16
import org .apache .lucene .store .DataOutput ;
19
20
import org .opensearch .cluster .metadata .IndexMetadata ;
20
21
import org .opensearch .cluster .metadata .RepositoryMetadata ;
21
22
import org .opensearch .cluster .service .ClusterService ;
23
+ import org .opensearch .common .UUIDs ;
22
24
import org .opensearch .common .blobstore .BlobContainer ;
23
25
import org .opensearch .common .blobstore .BlobPath ;
24
26
import org .opensearch .common .blobstore .BlobStore ;
25
27
import org .opensearch .common .blobstore .fs .FsBlobContainer ;
26
28
import org .opensearch .common .blobstore .fs .FsBlobStore ;
27
29
import org .opensearch .common .bytes .ReleasableBytesReference ;
30
+ import org .opensearch .common .io .Channels ;
28
31
import org .opensearch .common .lease .Releasable ;
29
32
import org .opensearch .common .lease .Releasables ;
30
33
import org .opensearch .common .settings .ClusterSettings ;
75
78
import java .nio .file .Files ;
76
79
import java .nio .file .NoSuchFileException ;
77
80
import java .nio .file .Path ;
81
+ import java .nio .file .StandardOpenOption ;
78
82
import java .util .ArrayList ;
79
83
import java .util .Collection ;
80
84
import java .util .Collections ;
110
114
import static org .mockito .ArgumentMatchers .any ;
111
115
import static org .mockito .Mockito .doAnswer ;
112
116
import static org .mockito .Mockito .mock ;
117
+ import static org .mockito .Mockito .times ;
118
+ import static org .mockito .Mockito .verify ;
113
119
import static org .mockito .Mockito .when ;
114
120
115
121
@ LuceneTestCase .SuppressFileSystems ("ExtrasFS" )
@@ -1679,6 +1685,24 @@ public void testCloseIntoReader() throws IOException {
1679
1685
final int value = buffer .getInt ();
1680
1686
assertEquals (i , value );
1681
1687
}
1688
+
1689
+ // Try to read into the footer which would lead into EOF exception.
1690
+ assertThrowsReadingIntoFooter (reader , numOps , 4 );
1691
+ // Try to read beyond the footer which would lead into EOF exception.
1692
+ assertThrowsReadingIntoFooter (reader , numOps , 18 );
1693
+
1694
+ // Read next 16 bytes directly from the file, which should be the footer.
1695
+ // This is because for this test, we create a writer which would automatically
1696
+ // create one with footer.
1697
+ long translogLengthWithoutFooter = reader .length - TranslogFooter .footerLength ();
1698
+ ByteBuffer footerBuffer = ByteBuffer .allocate (TranslogFooter .footerLength ());
1699
+ Channels .readFromFileChannelWithEofException (reader .channel , translogLengthWithoutFooter , footerBuffer );
1700
+ footerBuffer .flip ();
1701
+ // Validate the footer.
1702
+ assertEquals (CodecUtil .FOOTER_MAGIC , footerBuffer .getInt ());
1703
+ assertEquals (0 , footerBuffer .getInt ()); // Algorithm ID
1704
+ assertEquals (reader .getTranslogChecksum ().longValue (), footerBuffer .getLong ());
1705
+
1682
1706
final Checkpoint readerCheckpoint = reader .getCheckpoint ();
1683
1707
assertThat (readerCheckpoint , equalTo (writerCheckpoint ));
1684
1708
} finally {
@@ -1687,6 +1711,12 @@ public void testCloseIntoReader() throws IOException {
1687
1711
}
1688
1712
}
1689
1713
1714
+ // assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader.
1715
+ private void assertThrowsReadingIntoFooter (TranslogReader reader , int numOps , int bytesToRead ) {
1716
+ final ByteBuffer buffer = ByteBuffer .allocate (bytesToRead );
1717
+ assertThrows (EOFException .class , () -> reader .readBytes (buffer , reader .getFirstOperationOffset () + numOps * 4 ));
1718
+ }
1719
+
1690
1720
public void testDownloadWithRetries () throws IOException {
1691
1721
long generation = 1 , primaryTerm = 1 ;
1692
1722
Path location = createTempDir ();
@@ -1805,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
1805
1835
assertArrayEquals (filesPostFirstDownload , filesPostSecondDownload );
1806
1836
}
1807
1837
1838
+ /**
1839
+ * createTranslogFile creates a translog file with the given generation and checksum at the provided location.
1840
+ * */
1841
+ private void createTranslogFile (Path location , long generation , long checksum ) throws IOException {
1842
+ Path translogPath = location .resolve (Translog .getFilename (generation ));
1843
+ Path checkpointPath = location .resolve (Translog .getCommitCheckpointFileName (generation ));
1844
+ Files .createFile (translogPath );
1845
+ Files .createFile (checkpointPath );
1846
+ try (FileChannel channel = FileChannel .open (translogPath , StandardOpenOption .WRITE )) {
1847
+ // Write a translog header
1848
+ TranslogHeader header = new TranslogHeader (UUIDs .randomBase64UUID (), generation );
1849
+ header .write (channel , true );
1850
+
1851
+ // Write some translog operations
1852
+ byte [] operationBytes = new byte [] { 1 , 2 , 3 , 4 };
1853
+ channel .write (ByteBuffer .wrap (operationBytes ));
1854
+
1855
+ // Write the translog footer
1856
+ TranslogFooter .write (channel , checksum , true );
1857
+ }
1858
+ }
1859
+
1860
+ /**
1861
+ * testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog
1862
+ * file present locally. We test if the download logic for the same skips the download of the file.
1863
+ * */
1864
+ public void testIncrementalDownloadWithMatchingChecksum () throws IOException {
1865
+ // Set up the test scenario
1866
+ long generation = 1 ;
1867
+ long primaryTerm = 1 ;
1868
+ long checksum = 1234 ;
1869
+ Path location = createTempDir ();
1870
+ TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata (primaryTerm , generation , generation , 1 );
1871
+ Map <String , String > generationToPrimaryTermMapper = new HashMap <>();
1872
+ Map <String , String > generationToChecksumMapper = new HashMap <>();
1873
+ generationToPrimaryTermMapper .put (String .valueOf (generation ), String .valueOf (primaryTerm ));
1874
+ generationToChecksumMapper .put (String .valueOf (generation ), String .valueOf (checksum ));
1875
+ translogTransferMetadata .setGenerationToPrimaryTermMapper (generationToPrimaryTermMapper );
1876
+ translogTransferMetadata .setGenerationToChecksumMapper (generationToChecksumMapper );
1877
+
1878
+ // Mock the transfer manager.
1879
+ TranslogTransferManager mockTransfer = mock (TranslogTransferManager .class );
1880
+ RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock (RemoteTranslogTransferTracker .class );
1881
+ when (mockTransfer .readMetadata (0 )).thenReturn (translogTransferMetadata );
1882
+ when (mockTransfer .getRemoteTranslogTransferTracker ()).thenReturn (remoteTranslogTransferTracker );
1883
+
1884
+ // Create a local translog file with the same checksum as the remote
1885
+ createTranslogFile (location , generation , checksum );
1886
+
1887
+ // Verify that the download is skipped
1888
+ RemoteFsTranslog .download (mockTransfer , location , logger , false , 0 );
1889
+ verify (mockTransfer , times (0 )).downloadTranslog (any (), any (), any ());
1890
+ }
1891
+
1892
+ /**
1893
+ * testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations
1894
+ * in remote but only 1 present locally. We will download only 1 generation in this case.
1895
+ * */
1896
+ public void testIncrementalDownloadWithDifferentChecksum () throws IOException {
1897
+ // Set up the test scenario
1898
+ long generation1 = 1 , generation2 = 2 , primaryTerm = 1 ;
1899
+ long checksum1 = 1234 , checksum2 = 5678 ;
1900
+ Path location = createTempDir ();
1901
+
1902
+ TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata (primaryTerm , generation2 , generation1 , 2 );
1903
+ Map <String , String > generationToPrimaryTermMapper = Map .of (
1904
+ String .valueOf (generation1 ),
1905
+ String .valueOf (primaryTerm ),
1906
+ String .valueOf (generation2 ),
1907
+ String .valueOf (primaryTerm )
1908
+ );
1909
+ Map <String , String > generationToChecksumMapper = Map .of (
1910
+ String .valueOf (generation1 ),
1911
+ String .valueOf (checksum1 ),
1912
+ String .valueOf (generation2 ),
1913
+ String .valueOf (checksum2 )
1914
+ );
1915
+ translogTransferMetadata .setGenerationToPrimaryTermMapper (generationToPrimaryTermMapper );
1916
+ translogTransferMetadata .setGenerationToChecksumMapper (generationToChecksumMapper );
1917
+
1918
+ // Mock the transfer manager.
1919
+ TranslogTransferManager mockTransfer = mock (TranslogTransferManager .class );
1920
+ RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock (RemoteTranslogTransferTracker .class );
1921
+ when (mockTransfer .readMetadata (0 )).thenReturn (translogTransferMetadata );
1922
+ when (mockTransfer .getRemoteTranslogTransferTracker ()).thenReturn (remoteTranslogTransferTracker );
1923
+
1924
+ // Create a local translog file for 1 generation.
1925
+ createTranslogFile (location , generation1 , checksum1 );
1926
+ // Download counter to count the files which were downloaded.
1927
+ AtomicLong downloadCounter = new AtomicLong ();
1928
+ // Mock the download of second generation.
1929
+ doAnswer (invocation -> {
1930
+ downloadCounter .incrementAndGet ();
1931
+ Files .createFile (location .resolve (Translog .getCommitCheckpointFileName (generation2 )));
1932
+ return true ;
1933
+ }).when (mockTransfer ).downloadTranslog (String .valueOf (primaryTerm ), String .valueOf (generation2 ), location );
1934
+
1935
+ // Verify that only generation 2 is downloaded.
1936
+ RemoteFsTranslog .download (mockTransfer , location , logger , false , 0 );
1937
+ assertEquals (1 , downloadCounter .get ());
1938
+ // verify that generation 1 is not downloaded.
1939
+ verify (mockTransfer , times (0 )).downloadTranslog (String .valueOf (primaryTerm ), String .valueOf (generation1 ), location );
1940
+ }
1941
+
1808
1942
public void testSyncWithGlobalCheckpointUpdate () throws IOException {
1809
1943
ArrayList <Translog .Operation > ops = new ArrayList <>();
1810
1944
addToTranslogAndListAndUpload (translog , ops , new Translog .Index ("1" , 0 , primaryTerm .get (), new byte [] { 1 }));
0 commit comments