Skip to content

Commit 833b508

Browse files
author
Harsh Rawat
committed
added the unit tests for incremental download of translog
This commit adds the unit tests applicable for the changes made to support incremental download of translog files. Primarily, the unit tests cover the changes in- - TranslogFooter to write and read the footer of Translog - RemoteFSTranslog to skip the download of locally present translog - TranslogWriter to create reader with checksum upon close - TranslogReader closeIntoReader functionality Signed-off-by: Harsh Rawat <[email protected]>
1 parent e17ea7c commit 833b508

File tree

3 files changed

+316
-11
lines changed

3 files changed

+316
-11
lines changed

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.message.ParameterizedMessage;
1212
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
13+
import org.apache.lucene.codecs.CodecUtil;
1314
import org.apache.lucene.store.AlreadyClosedException;
1415
import org.apache.lucene.store.ByteArrayDataOutput;
1516
import org.apache.lucene.store.DataOutput;
@@ -19,12 +20,14 @@
1920
import org.opensearch.cluster.metadata.IndexMetadata;
2021
import org.opensearch.cluster.metadata.RepositoryMetadata;
2122
import org.opensearch.cluster.service.ClusterService;
23+
import org.opensearch.common.UUIDs;
2224
import org.opensearch.common.blobstore.BlobContainer;
2325
import org.opensearch.common.blobstore.BlobPath;
2426
import org.opensearch.common.blobstore.BlobStore;
2527
import org.opensearch.common.blobstore.fs.FsBlobContainer;
2628
import org.opensearch.common.blobstore.fs.FsBlobStore;
2729
import org.opensearch.common.bytes.ReleasableBytesReference;
30+
import org.opensearch.common.io.Channels;
2831
import org.opensearch.common.lease.Releasable;
2932
import org.opensearch.common.lease.Releasables;
3033
import org.opensearch.common.settings.ClusterSettings;
@@ -75,6 +78,7 @@
7578
import java.nio.file.Files;
7679
import java.nio.file.NoSuchFileException;
7780
import java.nio.file.Path;
81+
import java.nio.file.StandardOpenOption;
7882
import java.util.ArrayList;
7983
import java.util.Collection;
8084
import java.util.Collections;
@@ -110,6 +114,8 @@
110114
import static org.mockito.ArgumentMatchers.any;
111115
import static org.mockito.Mockito.doAnswer;
112116
import static org.mockito.Mockito.mock;
117+
import static org.mockito.Mockito.times;
118+
import static org.mockito.Mockito.verify;
113119
import static org.mockito.Mockito.when;
114120

115121
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
@@ -1679,6 +1685,24 @@ public void testCloseIntoReader() throws IOException {
16791685
final int value = buffer.getInt();
16801686
assertEquals(i, value);
16811687
}
1688+
1689+
// Try to read into the footer which would lead into EOF exception.
1690+
assertThrowsReadingIntoFooter(reader, numOps, 4);
1691+
// Try to read beyond the footer which would lead into EOF exception.
1692+
assertThrowsReadingIntoFooter(reader, numOps, 18);
1693+
1694+
// Read next 16 bytes directly from the file, which should be the footer.
1695+
// This is because for this test, we create a writer which would automatically
1696+
// create one with footer.
1697+
long translogLengthWithoutFooter = reader.length - TranslogFooter.footerLength();
1698+
ByteBuffer footerBuffer = ByteBuffer.allocate(TranslogFooter.footerLength());
1699+
Channels.readFromFileChannelWithEofException(reader.channel, translogLengthWithoutFooter, footerBuffer);
1700+
footerBuffer.flip();
1701+
// Validate the footer.
1702+
assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt());
1703+
assertEquals(0, footerBuffer.getInt()); // Algorithm ID
1704+
assertEquals(reader.getTranslogChecksum().longValue(), footerBuffer.getLong());
1705+
16821706
final Checkpoint readerCheckpoint = reader.getCheckpoint();
16831707
assertThat(readerCheckpoint, equalTo(writerCheckpoint));
16841708
} finally {
@@ -1687,6 +1711,12 @@ public void testCloseIntoReader() throws IOException {
16871711
}
16881712
}
16891713

1714+
// assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader.
1715+
private void assertThrowsReadingIntoFooter(TranslogReader reader, int numOps, int bytesToRead) {
1716+
final ByteBuffer buffer = ByteBuffer.allocate(bytesToRead);
1717+
assertThrows(EOFException.class, () -> reader.readBytes(buffer, reader.getFirstOperationOffset() + numOps * 4));
1718+
}
1719+
16901720
public void testDownloadWithRetries() throws IOException {
16911721
long generation = 1, primaryTerm = 1;
16921722
Path location = createTempDir();
@@ -1805,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
18051835
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
18061836
}
18071837

1838+
/**
1839+
* createTranslogFile creates a translog file with the given generation and checksum at the provided location.
1840+
* */
1841+
private void createTranslogFile(Path location, long generation, long checksum) throws IOException {
1842+
Path translogPath = location.resolve(Translog.getFilename(generation));
1843+
Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(generation));
1844+
Files.createFile(translogPath);
1845+
Files.createFile(checkpointPath);
1846+
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
1847+
// Write a translog header
1848+
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), generation);
1849+
header.write(channel, true);
1850+
1851+
// Write some translog operations
1852+
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
1853+
channel.write(ByteBuffer.wrap(operationBytes));
1854+
1855+
// Write the translog footer
1856+
TranslogFooter.write(channel, checksum, true);
1857+
}
1858+
}
1859+
1860+
/**
1861+
* testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog
1862+
* file present locally. We test if the download logic for the same skips the download of the file.
1863+
* */
1864+
public void testIncrementalDownloadWithMatchingChecksum() throws IOException {
1865+
// Set up the test scenario
1866+
long generation = 1;
1867+
long primaryTerm = 1;
1868+
long checksum = 1234;
1869+
Path location = createTempDir();
1870+
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1);
1871+
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
1872+
Map<String, String> generationToChecksumMapper = new HashMap<>();
1873+
generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm));
1874+
generationToChecksumMapper.put(String.valueOf(generation), String.valueOf(checksum));
1875+
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
1876+
translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper);
1877+
1878+
// Mock the transfer manager.
1879+
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
1880+
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
1881+
when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata);
1882+
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
1883+
1884+
// Create a local translog file with the same checksum as the remote
1885+
createTranslogFile(location, generation, checksum);
1886+
1887+
// Verify that the download is skipped
1888+
RemoteFsTranslog.download(mockTransfer, location, logger, false, 0);
1889+
verify(mockTransfer, times(0)).downloadTranslog(any(), any(), any());
1890+
}
1891+
1892+
/**
1893+
* testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations
1894+
* in remote but only 1 present locally. We will download only 1 generation in this case.
1895+
* */
1896+
public void testIncrementalDownloadWithDifferentChecksum() throws IOException {
1897+
// Set up the test scenario
1898+
long generation1 = 1, generation2 = 2, primaryTerm = 1;
1899+
long checksum1 = 1234, checksum2 = 5678;
1900+
Path location = createTempDir();
1901+
1902+
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation2, generation1, 2);
1903+
Map<String, String> generationToPrimaryTermMapper = Map.of(
1904+
String.valueOf(generation1),
1905+
String.valueOf(primaryTerm),
1906+
String.valueOf(generation2),
1907+
String.valueOf(primaryTerm)
1908+
);
1909+
Map<String, String> generationToChecksumMapper = Map.of(
1910+
String.valueOf(generation1),
1911+
String.valueOf(checksum1),
1912+
String.valueOf(generation2),
1913+
String.valueOf(checksum2)
1914+
);
1915+
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
1916+
translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper);
1917+
1918+
// Mock the transfer manager.
1919+
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
1920+
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
1921+
when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata);
1922+
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
1923+
1924+
// Create a local translog file for 1 generation.
1925+
createTranslogFile(location, generation1, checksum1);
1926+
// Download counter to count the files which were downloaded.
1927+
AtomicLong downloadCounter = new AtomicLong();
1928+
// Mock the download of second generation.
1929+
doAnswer(invocation -> {
1930+
downloadCounter.incrementAndGet();
1931+
Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation2)));
1932+
return true;
1933+
}).when(mockTransfer).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation2), location);
1934+
1935+
// Verify that only generation 2 is downloaded.
1936+
RemoteFsTranslog.download(mockTransfer, location, logger, false, 0);
1937+
assertEquals(1, downloadCounter.get());
1938+
// verify that generation 1 is not downloaded.
1939+
verify(mockTransfer, times(0)).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation1), location);
1940+
}
1941+
18081942
public void testSyncWithGlobalCheckpointUpdate() throws IOException {
18091943
ArrayList<Translog.Operation> ops = new ArrayList<>();
18101944
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.index.translog;
34+
35+
import org.apache.lucene.codecs.CodecUtil;
36+
import org.opensearch.common.UUIDs;
37+
import org.opensearch.test.OpenSearchTestCase;
38+
39+
import java.io.IOException;
40+
import java.nio.ByteBuffer;
41+
import java.nio.channels.FileChannel;
42+
import java.nio.file.Path;
43+
import java.nio.file.StandardOpenOption;
44+
45+
public class TranslogFooterTests extends OpenSearchTestCase {
46+
47+
/**
48+
* testTranslogFooterWrite verifies the functionality of TranslogFooter.write() method
49+
* wherein we write the footer to the translog file.
50+
* */
51+
public void testTranslogFooterWrite() throws IOException {
52+
Path translogPath = createTempFile();
53+
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
54+
// Write a translog header
55+
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong());
56+
header.write(channel, true);
57+
58+
// Write some translog operations
59+
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
60+
channel.write(ByteBuffer.wrap(operationBytes));
61+
62+
// Write the translog footer
63+
long expectedChecksum = 0x1234567890ABCDEFL;
64+
byte[] footer = TranslogFooter.write(channel, expectedChecksum, true);
65+
66+
// Verify the footer contents
67+
ByteBuffer footerBuffer = ByteBuffer.wrap(footer);
68+
assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt());
69+
assertEquals(0, footerBuffer.getInt());
70+
assertEquals(expectedChecksum, footerBuffer.getLong());
71+
72+
// Verify that the footer was written to the channel
73+
assertEquals(footer.length, channel.size() - (header.sizeInBytes() + operationBytes.length));
74+
}
75+
}
76+
77+
/**
78+
* testTranslogFooterReadChecksum verifies the behavior of the TranslogFooter.readChecksum() method,
79+
* which reads the checksum from the footer of a translog file.
80+
* */
81+
public void testTranslogFooterReadChecksum() throws IOException {
82+
long expectedChecksum = 0x1234567890ABCDEFL;
83+
Path translogPath = createTempFile();
84+
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
85+
// Write a translog header
86+
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong());
87+
header.write(channel, true);
88+
89+
// Write some translog operations
90+
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
91+
channel.write(ByteBuffer.wrap(operationBytes));
92+
93+
// Write the translog footer.
94+
TranslogFooter.write(channel, expectedChecksum, true);
95+
}
96+
97+
// Verify that the checksum can be read correctly
98+
Long actualChecksum = TranslogFooter.readChecksum(translogPath);
99+
assert actualChecksum != null;
100+
assertEquals(expectedChecksum, actualChecksum.longValue());
101+
}
102+
}

0 commit comments

Comments
 (0)