Skip to content

Commit e17ea7c

Browse files
author
Harsh Rawat
committed
added support for incremental download of translog files
Presently, the download workflow for remote backed storage works in a manner which causes the download for same translog files multiple times, each time deleting all the older files before downloading them again. This causes significant wasted network bandwidth, along with the time taken for the shard to become active. This change adds support for downloading the translog files incrementally and omitting the same if they are present locally. Signed-off-by: Harsh Rawat <[email protected]>
1 parent acf209f commit e17ea7c

13 files changed

+407
-35
lines changed

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,68 @@ private static void downloadOnce(
254254
Files.createDirectories(location);
255255
}
256256

257-
// Delete translog files on local before downloading from remote
257+
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
258+
Map<String, String> generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper();
259+
long maxGeneration = translogMetadata.getGeneration();
260+
long minGeneration = translogMetadata.getMinTranslogGeneration();
261+
262+
// Delete any translog and checkpoint file which is not part of the current generation range.
258263
for (Path file : FileSystemUtils.files(location)) {
259-
Files.delete(file);
264+
try {
265+
long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN);
266+
if (generation < minGeneration || generation > maxGeneration) {
267+
// If the generation is outside the required range, then we delete the same.
268+
Files.delete(file);
269+
}
270+
} catch (IllegalStateException | IllegalArgumentException e) {
271+
// Delete any file which does not conform to Translog or Checkpoint filename patterns.
272+
Files.delete(file);
273+
}
260274
}
261275

262-
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
263-
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
276+
for (long i = maxGeneration; i >= minGeneration; i--) {
264277
String generation = Long.toString(i);
278+
279+
// For incremental downloads, we will check if the local translog matches the one present in
280+
// remote store. If so, we will skip its download.
281+
String translogFilename = Translog.getFilename(i);
282+
Path targetTranslogPath = location.resolve(translogFilename);
283+
284+
// If we have the translog available for the generation then we need to
285+
// compare the checksum with that in remote obtained via metadata.
286+
// For backward compatibility, we consider the following cases here-
287+
// - Remote metadata does not have the mapping for generation
288+
// - Local translog file lacks the checksum value in footer
289+
// In both these cases, we will download the files for the generation.
290+
if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) {
291+
try {
292+
final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation));
293+
final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath);
294+
295+
// If the local and remote checksum are same, then continue.
296+
// Else exit the loop and download the translog.
297+
if (actualChecksum != null && actualChecksum == expectedChecksum) {
298+
logger.info(
299+
"Download skipped for translog and checkpoint files for generation={} due to them being locally present",
300+
generation
301+
);
302+
303+
// Mark the translog and checkpoint file as downloaded in the file tracker.
304+
translogTransferManager.markFileAsDownloaded(translogFilename);
305+
translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i));
306+
continue;
307+
}
308+
} catch (IOException e) {
309+
// The exception can occur if the remote translog files were uploaded without footer.
310+
logger.info(
311+
"Exception occurred during reconciliation of translog state between local and remote. "
312+
+ "Reverting to downloading the translog and checksum files for generation={}",
313+
generation
314+
);
315+
}
316+
}
317+
318+
logger.info("Downloading translog and checkpoint files for generation={}", generation);
265319
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
266320
}
267321
logger.info(

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
134134
public static final String CHECKPOINT_SUFFIX = ".ckp";
135135
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;
136136

137+
static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$");
137138
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
138139
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID());
139140

@@ -320,14 +321,18 @@ public static long parseIdFromFileName(Path translogFile) {
320321
return parseIdFromFileName(fileName);
321322
}
322323

323-
public static long parseIdFromFileName(String fileName) {
324-
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
324+
public static long parseIdFromFileName(String translogFile) {
325+
return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN);
326+
}
327+
328+
public static long parseIdFromFileName(String fileName, Pattern pattern) {
329+
final Matcher matcher = pattern.matcher(fileName);
325330
if (matcher.matches()) {
326331
try {
327332
return Long.parseLong(matcher.group(1));
328333
} catch (NumberFormatException e) {
329334
throw new IllegalStateException(
330-
"number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]",
335+
"number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]",
331336
e
332337
);
333338
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.index.translog;
34+
35+
import org.apache.lucene.codecs.CodecUtil;
36+
import org.apache.lucene.store.OutputStreamDataOutput;
37+
import org.opensearch.common.io.Channels;
38+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
39+
40+
import java.io.ByteArrayOutputStream;
41+
import java.io.IOException;
42+
import java.nio.ByteBuffer;
43+
import java.nio.channels.FileChannel;
44+
import java.nio.file.Path;
45+
import java.nio.file.StandardOpenOption;
46+
47+
/**
48+
* Each translog file is started with a header followed by the translog operations, and ending with a footer.
49+
* The footer encapsulates the checksum of the translog.
50+
* */
51+
public class TranslogFooter {
52+
53+
/**
54+
* footerLength returns the length of the footer.
55+
* We are writing 16 bytes and therefore, we return the same.
56+
*/
57+
static int footerLength() {
58+
return 16;
59+
}
60+
61+
/**
62+
* write the translog footer which records both checksum and algorithm ID.
63+
* This method is based upon the CodecUtils.writeFooter method.
64+
* This footer can be parsed and read with TranslogFooter.readChecksum().
65+
*
66+
* Similar to CodecUtils documentation, the footer consists of-
67+
* Footer --> Magic,AlgorithmID,Checksum
68+
* Magic --> Uint32. This identifies the start of the footer. It is always -1071082520.
69+
* AlgorithmID --> Uint32. This indicates the checksum algorithm used. Currently, this is always 0.
70+
* Checksum --> Uint64. This is the checksum as calculated for the translog.
71+
* */
72+
static byte[] write(FileChannel channel, long checksum, boolean toSync) throws IOException {
73+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
74+
final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream));
75+
76+
CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC);
77+
CodecUtil.writeBEInt(out, 0);
78+
CodecUtil.writeBELong(out, checksum);
79+
80+
Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel);
81+
if (toSync) {
82+
channel.force(false);
83+
}
84+
85+
return byteArrayOutputStream.toByteArray();
86+
}
87+
88+
/**
89+
* readChecksum reads the translog file from the given location and returns the checksum if present in the footer.
90+
* If the translog file is of older version and the footer is not present, then we return null.
91+
* */
92+
static Long readChecksum(Path path) throws IOException {
93+
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
94+
// Read the header and find out if the footer is supported.
95+
final TranslogHeader header = TranslogHeader.read(path, channel);
96+
if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) {
97+
return null;
98+
}
99+
100+
// Read the footer.
101+
final long fileSize = channel.size();
102+
final long footerStart = fileSize - TranslogFooter.footerLength();
103+
ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength());
104+
int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer);
105+
if (bytesRead != TranslogFooter.footerLength()) {
106+
throw new IOException(
107+
"Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes"
108+
);
109+
}
110+
footer.flip();
111+
112+
// Validate the footer and return the checksum.
113+
int magic = footer.getInt();
114+
if (magic != CodecUtil.FOOTER_MAGIC) {
115+
throw new IOException("Invalid footer magic number: " + magic);
116+
}
117+
118+
int algorithmId = footer.getInt();
119+
if (algorithmId != 0) {
120+
throw new IOException("Unsupported checksum algorithm ID: " + algorithmId);
121+
}
122+
123+
return footer.getLong();
124+
}
125+
}
126+
}

server/src/main/java/org/opensearch/index/translog/TranslogHeader.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ public final class TranslogHeader {
6565
public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported
6666
public static final int VERSION_CHECKPOINTS = 2; // added checkpoints
6767
public static final int VERSION_PRIMARY_TERM = 3; // added primary term
68-
public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM;
68+
public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog
69+
public static final int CURRENT_VERSION = VERSION_WITH_FOOTER;
6970

7071
private final String translogUUID;
7172
private final long primaryTerm;
7273
private final int headerSizeInBytes;
74+
private final int translogHeaderVersion;
7375

7476
/**
7577
* Creates a new translog header with the given uuid and primary term.
@@ -80,14 +82,17 @@ public final class TranslogHeader {
8082
* All operations' terms in this translog file are enforced to be at most this term.
8183
*/
8284
TranslogHeader(String translogUUID, long primaryTerm) {
83-
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID));
85+
// When we create Header on the fly, we will use the latest current version as that would include the
86+
// checksum as part of the footer.
87+
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION);
8488
assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]";
8589
}
8690

87-
private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) {
91+
private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) {
8892
this.translogUUID = translogUUID;
8993
this.primaryTerm = primaryTerm;
9094
this.headerSizeInBytes = headerSizeInBytes;
95+
this.translogHeaderVersion = headerVersion;
9196
}
9297

9398
public String getTranslogUUID() {
@@ -110,6 +115,13 @@ public int sizeInBytes() {
110115
return headerSizeInBytes;
111116
}
112117

118+
/**
119+
* Returns the version of the translog header.
120+
* */
121+
public int getTranslogHeaderVersion() {
122+
return translogHeaderVersion;
123+
}
124+
113125
static int headerSizeInBytes(String translogUUID) {
114126
return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length);
115127
}
@@ -127,7 +139,7 @@ private static int headerSizeInBytes(int version, int uuidLength) {
127139
static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException {
128140
final int version;
129141
try {
130-
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
142+
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION);
131143
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
132144
tryReportOldVersionError(path, channel);
133145
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
@@ -183,7 +195,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
183195
in.read(uuid.bytes, uuid.offset, uuid.length);
184196
// Read the primary term
185197
final long primaryTerm;
186-
if (version == VERSION_PRIMARY_TERM) {
198+
if (version >= VERSION_PRIMARY_TERM) {
187199
primaryTerm = in.readLong();
188200
} else {
189201
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
@@ -202,7 +214,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
202214
+ channel.position()
203215
+ "]";
204216

205-
return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes);
217+
return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version);
206218
} catch (EOFException e) {
207219
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
208220
}

0 commit comments

Comments
 (0)