Skip to content

Commit e90a1c2

Browse files
authored
chore: 19096: Improve DataFileWriter: cache FileChannel and deprecate itemsCount
Fixes: #19096 Reviewed-by: Anthony Petrov <[email protected]>, Artem Ananev <[email protected]>, Roger Barker <[email protected]> Signed-off-by: Artur Kugal <[email protected]>
1 parent 82ea7e2 commit e90a1c2

File tree

16 files changed

+464
-293
lines changed

16 files changed

+464
-293
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.swirlds.base.test.fixtures.util;
3+
4+
import java.util.Random;
5+
import java.util.UUID;
6+
7+
/**
8+
* Utility class to generate some primitive data for testing purposes.
9+
*/
10+
public final class DataUtils {
11+
12+
private DataUtils() {
13+
// Prevent instantiation
14+
}
15+
16+
public static int[] shuffle(Random random, final int[] array) {
17+
if (random == null) {
18+
random = new Random();
19+
}
20+
final int count = array.length;
21+
for (int i = count; i > 1; i--) {
22+
swap(array, i - 1, random.nextInt(i));
23+
}
24+
return array;
25+
}
26+
27+
public static void swap(final int[] array, final int i, final int j) {
28+
final int temp = array[i];
29+
array[i] = array[j];
30+
array[j] = temp;
31+
}
32+
33+
public static byte[] randomUtf8Bytes(final int n) {
34+
final byte[] data = new byte[n];
35+
int i = 0;
36+
while (i < n) {
37+
final byte[] rnd = UUID.randomUUID().toString().getBytes();
38+
System.arraycopy(rnd, 0, data, i, Math.min(rnd.length, n - 1 - i));
39+
i += rnd.length;
40+
}
41+
return data;
42+
}
43+
}

platform-sdk/swirlds-merkledb/build.gradle.kts

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ tasks.withType<JavaCompile>().configureEach {
1616

1717
mainModuleInfo { annotationProcessor("com.swirlds.config.processor") }
1818

19-
jmhModuleInfo { requires("jmh.core") }
19+
jmhModuleInfo {
20+
requires("jmh.core")
21+
requires("com.swirlds.base.test.fixtures")
22+
}
2023

2124
testModuleInfo {
2225
requires("com.swirlds.common.test.fixtures")

platform-sdk/swirlds-merkledb/src/hammer/java/com/swirlds/merkledb/files/DataFileReaderHammerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void interruptedReadsHammerTest() throws Exception {
5454
final ExecutorService exec = Executors.newFixedThreadPool(readerThreads);
5555
final Random rand = new Random();
5656
final MerkleDbConfig dbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class);
57-
final DataFileMetadata metadata = new DataFileMetadata(itemCount, 0, Instant.now(), INITIAL_COMPACTION_LEVEL);
57+
final DataFileMetadata metadata = new DataFileMetadata(0, Instant.now(), INITIAL_COMPACTION_LEVEL);
5858
final DataFileReader dataReader = new DataFileReader(dbConfig, tempFile, metadata);
5959
final AtomicInteger activeReaders = new AtomicInteger(readerThreads);
6060
final AtomicReferenceArray<Thread> threads = new AtomicReferenceArray<>(readerThreads);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.swirlds.benchmark;
3+
4+
import static com.swirlds.base.units.UnitConstants.MEBIBYTES_TO_BYTES;
5+
6+
import com.hedera.pbj.runtime.io.buffer.BufferedData;
7+
import com.swirlds.base.test.fixtures.util.DataUtils;
8+
import com.swirlds.common.io.utility.FileUtils;
9+
import com.swirlds.merkledb.files.DataFileWriter;
10+
import java.io.IOException;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import java.time.Instant;
14+
import java.util.Arrays;
15+
import java.util.Random;
16+
import java.util.concurrent.TimeUnit;
17+
import org.openjdk.jmh.annotations.Benchmark;
18+
import org.openjdk.jmh.annotations.BenchmarkMode;
19+
import org.openjdk.jmh.annotations.Fork;
20+
import org.openjdk.jmh.annotations.Level;
21+
import org.openjdk.jmh.annotations.Measurement;
22+
import org.openjdk.jmh.annotations.Mode;
23+
import org.openjdk.jmh.annotations.OutputTimeUnit;
24+
import org.openjdk.jmh.annotations.Param;
25+
import org.openjdk.jmh.annotations.Scope;
26+
import org.openjdk.jmh.annotations.Setup;
27+
import org.openjdk.jmh.annotations.State;
28+
import org.openjdk.jmh.annotations.TearDown;
29+
import org.openjdk.jmh.annotations.Warmup;
30+
31+
/**
32+
* This benchmarks can be used to measure performance of {@link DataFileWriter} class.
33+
* It uses pregenerated list of data items to measure only writing speed.
34+
*/
35+
@State(Scope.Benchmark)
36+
public class DataFileWriterBenchmark {
37+
38+
/**
39+
* Size of the buffer in MB - <b>should not</b> impact on performance with the current implementation with mapped buffer.
40+
*/
41+
@Param({"16", "64", "128", "256"})
42+
public int bufferSizeMb;
43+
44+
/**
45+
* Maximum size of the file to write in MB.
46+
*/
47+
@Param({"50", "200", "500"})
48+
public int maxFileSizeMb;
49+
50+
/**
51+
* Number of sample data items to pre-generate before each iteration.
52+
*/
53+
@Param({"10"})
54+
public int sampleSize;
55+
56+
/**
57+
* Range of the sample data items in bytes, chosen randomly from the sample.
58+
*/
59+
// first is test for small data items like hashes
60+
@Param({"56-56", "300-1000", "1024-8192"})
61+
public String sampleRangeBytes;
62+
63+
// Runtime variables
64+
private Random random;
65+
private BufferedData[] sampleData;
66+
private long maxFileSize;
67+
68+
private Path benchmarkDir;
69+
private DataFileWriter dataFileWriter;
70+
71+
@Setup(Level.Trial)
72+
public void setupGlobal() throws IOException {
73+
random = new Random(1234);
74+
benchmarkDir = Files.createTempDirectory("dataFileWriterBenchmark");
75+
76+
maxFileSize = maxFileSizeMb * MEBIBYTES_TO_BYTES;
77+
78+
// Generate sample data
79+
String[] range = sampleRangeBytes.split("-");
80+
int sampleMinLength = Integer.parseInt(range[0]);
81+
int sampleMaxLength = Integer.parseInt(range[1]);
82+
83+
sampleData = new BufferedData[sampleSize];
84+
for (int i = 0; i < sampleSize; i++) {
85+
sampleData[i] =
86+
BufferedData.wrap(DataUtils.randomUtf8Bytes(random.nextInt(sampleMinLength, sampleMaxLength + 1)));
87+
}
88+
89+
System.out.println("Sample data sizes in bytes: "
90+
+ Arrays.toString(Arrays.stream(sampleData)
91+
.mapToLong(BufferedData::length)
92+
.toArray()));
93+
}
94+
95+
@TearDown(Level.Trial)
96+
public void tearDownGlobal() throws IOException {
97+
if (benchmarkDir != null) {
98+
FileUtils.deleteDirectory(benchmarkDir);
99+
}
100+
}
101+
102+
@Setup(Level.Invocation)
103+
public void setup() throws IOException {
104+
dataFileWriter =
105+
new DataFileWriter("test", benchmarkDir, 1, Instant.now(), 1, bufferSizeMb * MEBIBYTES_TO_BYTES);
106+
}
107+
108+
@TearDown(Level.Invocation)
109+
public void tearDown() throws IOException {
110+
dataFileWriter.close();
111+
}
112+
113+
@Benchmark
114+
@BenchmarkMode(Mode.AverageTime)
115+
@Measurement(iterations = 1, time = 3, timeUnit = TimeUnit.SECONDS)
116+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
117+
@Fork(value = 1, warmups = 0)
118+
@Warmup(iterations = 0)
119+
public void writeInFile() throws IOException {
120+
long fileSize = 0;
121+
BufferedData data;
122+
123+
while (true) {
124+
data = getRandomData();
125+
fileSize += data.length();
126+
if (fileSize > maxFileSize) {
127+
break;
128+
}
129+
130+
dataFileWriter.storeDataItem(data);
131+
data.flip();
132+
}
133+
}
134+
135+
private BufferedData getRandomData() {
136+
return sampleData[random.nextInt(sampleSize)];
137+
}
138+
}

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCollection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public void close() throws IOException {
319319
// finish writing if we still are
320320
final DataFileWriter currentDataFileForWriting = currentDataFileWriter.getAndSet(null);
321321
if (currentDataFileForWriting != null) {
322-
currentDataFileForWriting.finishWriting();
322+
currentDataFileForWriting.close();
323323
}
324324
// calling startSnapshot causes the metadata file to be written
325325
saveMetadata(storeDir);
@@ -401,7 +401,7 @@ public DataFileReader endWriting(final long minimumValidKey, final long maximumV
401401
throw new IOException("Tried to end writing when we never started writing.");
402402
}
403403
// finish writing the file and write its footer
404-
dataWriter.finishWriting();
404+
dataWriter.close();
405405
final DataFileReader dataReader = currentDataFileReader.getAndSet(null);
406406
if (logger.isTraceEnabled()) {
407407
final DataFileMetadata metadata = dataReader.getMetadata();

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCompactor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private void startNewCompactionFile(int compactionLevel) throws IOException {
318318
* @throws IOException If an I/O error occurs
319319
*/
320320
private void finishCurrentCompactionFile() throws IOException {
321-
currentWriter.get().finishWriting();
321+
currentWriter.get().close();
322322
currentWriter.set(null);
323323
// Now include the file in future compactions
324324
currentReader.get().setFileCompleted();

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileIterator.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import java.nio.file.Path;
1717
import java.nio.file.StandardOpenOption;
1818
import java.util.Objects;
19-
import org.apache.logging.log4j.LogManager;
20-
import org.apache.logging.log4j.Logger;
2119

2220
/**
2321
* Iterator class for iterating over data items in a data file created by {@link DataFileWriter}.
@@ -30,8 +28,6 @@
3028
*/
3129
public final class DataFileIterator implements AutoCloseable {
3230

33-
private static final Logger logger = LogManager.getLogger(DataFileIterator.class);
34-
3531
/** Input stream this iterator is reading from */
3632
private final BufferedInputStream inputStream;
3733
/** Readable data on top of the input stream */
@@ -113,20 +109,11 @@ public boolean next() throws IOException {
113109
throw new IllegalStateException("Cannot read from a closed iterator");
114110
}
115111

116-
// Have we reached the end?
117-
if (currentDataItem >= metadata.getDataItemCount() - 1) {
118-
if (in.hasRemaining()) {
119-
logger.warn(
120-
"Data file has more data than expected items={}. {}", metadata.getDataItemCount(), toString());
121-
}
122-
dataItemBuffer = null;
123-
return false;
124-
}
125-
126112
while (in.hasRemaining()) {
127113
currentDataItemFilePosition = in.position();
128114
final int tag = in.readVarInt(false);
129115
final int fieldNum = tag >> TAG_FIELD_OFFSET;
116+
130117
if (fieldNum == FIELD_DATAFILE_ITEMS.number()) {
131118
final int currentDataItemSize = in.readVarInt(false);
132119
dataItemBuffer = fillBuffer(currentDataItemSize);
@@ -140,7 +127,7 @@ public boolean next() throws IOException {
140127
}
141128
}
142129

143-
throw new IllegalStateException("Reached the end of data file while expecting more data items");
130+
return false;
144131
}
145132

146133
/**

0 commit comments

Comments
 (0)