diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V059HintsSchema.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V059HintsSchema.java index d727d88d4910..a3502ee31042 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V059HintsSchema.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V059HintsSchema.java @@ -34,8 +34,8 @@ public class V059HintsSchema extends Schema { private static final SemanticVersion VERSION = SemanticVersion.newBuilder().minor(59).build(); - private static final long MAX_HINTS = 1L << 31; - private static final long MAX_PREPROCESSING_VOTES = 1L << 31; + private static final long MAX_HINTS = 1L << 10; + private static final long MAX_PREPROCESSING_VOTES = 1L << 10; public static final String HINTS_KEY_SETS_KEY = "HINTS_KEY_SETS"; public static final String ACTIVE_HINT_CONSTRUCTION_KEY = "ACTIVE_HINT_CONSTRUCTION"; diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V060HintsSchema.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V060HintsSchema.java index 3e5cba1ad8c7..845f5de78ff2 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V060HintsSchema.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/hints/schemas/V060HintsSchema.java @@ -30,7 +30,7 @@ public class V060HintsSchema extends Schema { public static final String CRS_PUBLICATIONS_KEY = "CRS_PUBLICATIONS"; private static final Logger log = LogManager.getLogger(V060HintsSchema.class); - private static final long MAX_CRS_PUBLICATIONS = 1L << 31; + private static final long MAX_CRS_PUBLICATIONS = 1L << 10; private final HintsContext signingContext; private final HintsLibrary library; diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/state/merkle/MerkleSchemaRegistryTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/state/merkle/MerkleSchemaRegistryTest.java index 1df20ef9a237..7f98a721bcba 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/state/merkle/MerkleSchemaRegistryTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/state/merkle/MerkleSchemaRegistryTest.java @@ -75,7 +75,9 @@ void setUp() { final var hederaConfig = mock(HederaConfig.class); lenient().when(config.getConfigData(HederaConfig.class)).thenReturn(hederaConfig); final var merkleDbConfig = mock(MerkleDbConfig.class); + lenient().when(merkleDbConfig.goodAverageBucketEntryCount()).thenReturn(32); lenient().when(merkleDbConfig.longListChunkSize()).thenReturn(1024); + lenient().when(merkleDbConfig.maxNumOfKeys()).thenReturn(1000L); lenient().when(config.getConfigData(MerkleDbConfig.class)).thenReturn(merkleDbConfig); final var virtualMapConfig = mock(VirtualMapConfig.class); lenient().when(config.getConfigData(VirtualMapConfig.class)).thenReturn(virtualMapConfig); diff --git a/hedera-node/hedera-consensus-service-impl/src/main/java/com/hedera/node/app/service/consensus/impl/schemas/V0490ConsensusSchema.java b/hedera-node/hedera-consensus-service-impl/src/main/java/com/hedera/node/app/service/consensus/impl/schemas/V0490ConsensusSchema.java index 4f66be02e946..59f40ec0753c 100644 --- a/hedera-node/hedera-consensus-service-impl/src/main/java/com/hedera/node/app/service/consensus/impl/schemas/V0490ConsensusSchema.java +++ b/hedera-node/hedera-consensus-service-impl/src/main/java/com/hedera/node/app/service/consensus/impl/schemas/V0490ConsensusSchema.java @@ -25,7 +25,7 @@ public class V0490ConsensusSchema extends Schema { private static final SemanticVersion VERSION = SemanticVersion.newBuilder().major(0).minor(49).patch(0).build(); - private static final long MAX_TOPICS = 1_000_000_000L; + private static final long MAX_TOPICS = 1_000_000L; /** * Constructor for this schema. diff --git a/hedera-node/hedera-file-service-impl/src/main/java/com/hedera/node/app/service/file/impl/schemas/V0490FileSchema.java b/hedera-node/hedera-file-service-impl/src/main/java/com/hedera/node/app/service/file/impl/schemas/V0490FileSchema.java index 6a9672ff9fef..f8ca1e330a94 100644 --- a/hedera-node/hedera-file-service-impl/src/main/java/com/hedera/node/app/service/file/impl/schemas/V0490FileSchema.java +++ b/hedera-node/hedera-file-service-impl/src/main/java/com/hedera/node/app/service/file/impl/schemas/V0490FileSchema.java @@ -92,10 +92,14 @@ public class V0490FileSchema extends Schema { private static final String DEFAULT_THROTTLES_RESOURCE = "genesis/throttles.json"; /** - * A hint to the database system of the maximum number of files we will store. This MUST NOT BE CHANGED. If it is - * changed, then the database has to be rebuilt. + * A hint to the database system of the expected maximum number of files we will store. This hint + * is used by the database to optimize its indices. If more than this number of files are actually + * stored, the database can handle that just fine. + * + *

If this number is changed, it will not have any effect on existing networks. Only new + * deployments will use the updated hint. */ - private static final int MAX_FILES_HINT = 50_000_000; + private static final int MAX_FILES_HINT = 50_000; /** * The version of the schema. */ diff --git a/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0490ScheduleSchema.java b/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0490ScheduleSchema.java index 25ae11b3952c..04ad0e1cb2c9 100644 --- a/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0490ScheduleSchema.java +++ b/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0490ScheduleSchema.java @@ -17,9 +17,9 @@ * General schema for the schedule service. */ public final class V0490ScheduleSchema extends Schema { - private static final long MAX_SCHEDULES_BY_ID_KEY = 50_000_000L; - private static final long MAX_SCHEDULES_BY_EXPIRY_SEC_KEY = 50_000_000L; - private static final long MAX_SCHEDULES_BY_EQUALITY = 50_000_000L; + private static final long MAX_SCHEDULES_BY_ID_KEY = 50_000L; + private static final long MAX_SCHEDULES_BY_EXPIRY_SEC_KEY = 50_000L; + private static final long MAX_SCHEDULES_BY_EQUALITY = 50_000L; /** * The version of the schema. */ diff --git a/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0570ScheduleSchema.java b/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0570ScheduleSchema.java index 420ba2c18adc..4f2d6f6b6823 100644 --- a/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0570ScheduleSchema.java +++ b/hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/schemas/V0570ScheduleSchema.java @@ -24,10 +24,10 @@ public final class V0570ScheduleSchema extends Schema { private static final Logger log = LogManager.getLogger(V0570ScheduleSchema.class); - private static final long MAX_SCHEDULED_COUNTS = 50_000_000L; - private static final long MAX_SCHEDULED_ORDERS = 50_000_000L; - private static final long MAX_SCHEDULED_USAGES = 50_000_000L; - private static final long MAX_SCHEDULE_ID_BY_EQUALITY = 50_000_000L; + private static final long MAX_SCHEDULED_COUNTS = 50_000L; + private static final long MAX_SCHEDULED_ORDERS = 50_000L; + private static final long MAX_SCHEDULED_USAGES = 50_000L; + private static final long MAX_SCHEDULE_ID_BY_EQUALITY = 50_000L; private static final SemanticVersion VERSION = SemanticVersion.newBuilder().major(0).minor(57).patch(0).build(); /** diff --git a/hedera-node/hedera-smart-contract-service-impl/src/main/java/com/hedera/node/app/service/contract/impl/schemas/V0490ContractSchema.java b/hedera-node/hedera-smart-contract-service-impl/src/main/java/com/hedera/node/app/service/contract/impl/schemas/V0490ContractSchema.java index 89c5c2e9b144..0f427384ce80 100644 --- a/hedera-node/hedera-smart-contract-service-impl/src/main/java/com/hedera/node/app/service/contract/impl/schemas/V0490ContractSchema.java +++ b/hedera-node/hedera-smart-contract-service-impl/src/main/java/com/hedera/node/app/service/contract/impl/schemas/V0490ContractSchema.java @@ -18,8 +18,8 @@ * for both the contract storage and bytecode. */ public class V0490ContractSchema extends Schema { - private static final int MAX_BYTECODES = 50_000_000; - private static final int MAX_STORAGE_ENTRIES = 1_000_000_000; + private static final int MAX_BYTECODES = 50_000; + private static final int MAX_STORAGE_ENTRIES = 1_000_000; private static final SemanticVersion VERSION = SemanticVersion.newBuilder().major(0).minor(49).patch(0).build(); diff --git a/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0490TokenSchema.java b/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0490TokenSchema.java index 76f4c3632bc8..037709ef5a22 100644 --- a/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0490TokenSchema.java +++ b/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0490TokenSchema.java @@ -24,14 +24,17 @@ * Initial mod-service schema for the token service. */ public class V0490TokenSchema extends Schema { - // These need to be big so databases are created at right scale. If they are too small then the on disk hash map - // buckets will be too full which results in very poor performance. Have chosen 10 billion as should give us - // plenty of runway. - private static final long MAX_STAKING_INFOS = 1_000_000L; - private static final long MAX_TOKENS = 1_000_000_000L; - private static final long MAX_ACCOUNTS = 1_000_000_000L; - private static final long MAX_TOKEN_RELS = 1_000_000_000L; - private static final long MAX_MINTABLE_NFTS = 1_000_000_000L; + + // Initial virtual map capacity values. Previously these values had to be large enough to avoid + // key hash collisions at the database level, which would result in low performance. With the + // new feature of dynamic hash map resizing in the database, these capacity hints can be kept + // low. These are just hints for initial virtual map sizes. Over time the maps will be able to + // contain more elements, if needed + private static final long MAX_STAKING_INFOS = 1_000L; + private static final long MAX_TOKENS = 1_000_000L; + private static final long MAX_ACCOUNTS = 1_000_000L; + private static final long MAX_TOKEN_RELS = 1_000_000L; + private static final long MAX_MINTABLE_NFTS = 1_000_000L; private static final SemanticVersion VERSION = SemanticVersion.newBuilder().major(0).minor(49).patch(0).build(); diff --git a/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0530TokenSchema.java b/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0530TokenSchema.java index 4b00bf7a340b..9c25b9851146 100644 --- a/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0530TokenSchema.java +++ b/hedera-node/hedera-token-service-impl/src/main/java/com/hedera/node/app/service/token/impl/schemas/V0530TokenSchema.java @@ -13,7 +13,7 @@ public class V0530TokenSchema extends Schema { private static final Logger logger = LogManager.getLogger(V0530TokenSchema.class); - private static final long MAX_PENDING_AIRDROPS = 1_000_000_000L; + private static final long MAX_PENDING_AIRDROPS = 1_000_000L; public static final String AIRDROPS_KEY = "PENDING_AIRDROPS"; private static final SemanticVersion VERSION = diff --git a/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-entries-cleanup.svg b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-entries-cleanup.svg new file mode 100644 index 000000000000..563170a4180f --- /dev/null +++ b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-entries-cleanup.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-after-resize.svg b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-after-resize.svg new file mode 100644 index 000000000000..70329245cabf --- /dev/null +++ b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-after-resize.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-before-resize.svg b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-before-resize.svg new file mode 100644 index 000000000000..b7d75268da09 --- /dev/null +++ b/platform-sdk/docs/proposals/scaling-hdhm-buckets/bucket-index-before-resize.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/platform-sdk/docs/proposals/scaling-hdhm-buckets/proposal.md b/platform-sdk/docs/proposals/scaling-hdhm-buckets/proposal.md new file mode 100644 index 000000000000..ab54ced6dead --- /dev/null +++ b/platform-sdk/docs/proposals/scaling-hdhm-buckets/proposal.md @@ -0,0 +1,207 @@ +# Scalability - adjusting the number of HalfDiskHashMap buckets + +--- + +## Summary + +### Background + +`HalfDiskHashMap` is a data structure that holds virtual key to virtual path mapping. Virtual paths +are required to locate key/value records in virtual maps. For every key (object) a path (long) is +stored in `HalfDiskHashMap`. All requests like `VirtualMap.get(key)` first get a path for the key, +and then retrieve the value using this path. + +`HalfDiskHashMap` stores key/path mappings in buckets. Every bucket contain the following info: +* Bucket ID: `0` to `N - 1`, where `N` is the number of buckets +* Set of bucket entries + +Each bucket entry contains: +* key hash code +* key bytes +* value (path) + +When a key is added to `HalfDiskHashMap`, the bucket to add it to is found using key's hash code. +The number of buckets is always a power of two, so the operation is very cheap and straightforward, +just `hashCode & (N - 1)`. It's to take a few lowest bits of the hash code. + +Buckets are stored on disk in data files, in the "objectKeyToPath" store. To read and write buckets, +the store uses a bucket index, this is a mapping from buckets IDs (`0` to `N - 1`) to data locations +(file number + offset in the file). Index size is N, it takes N * 8 bytes in memory, since data +locations are longs. + +### The problem + +The number of buckets is currently fixed and cannot be changed. It's set based on projected map +size in assumption that a bucket should not contain more than 32 key/path entries. For example, +if map size is 1_000_000_000, then the number of buckets is about 32M (2^25, to be precise). +In reality, maps usually contain fewer entities than projected, so some buckets are empty, and +many buckets contain less than 32 entries. However, the whole bucket index for all 32M buckets +is allocated at node startup, it may consume substantial fraction of node RAM. + +One approach to reduce memory allocated for the bucket index is to use a lesser number of buckets. +For example, for maps estimated to contain 1_000_000_000 entries use 1M buckets instead of 32M +buckets. It would reduce index size from 256Mb to 8Mb. While the map is small, it would work +fine, but over time it would be very slow. When the map really grows to 1B elements, each bucket +would contain 1K key/path entries, and key lookups in buckets would be prohibitively expensive. + +### Proposed solution + +There should be a mechanism to increase the number of buckets, when needed. It will let nodes +start with a small number of buckets initially. At the same time, while virtual maps grow in size, +more buckets will be added. This is very similar to what typical hash maps do. + +## Architecture + +When a MerkleDb database instance is created, it uses virtual map projected size hint to calculate +the size of bucket index as described above. The index is a mapping between bucket IDs and bucket +locations on disk: + +![Bucket index](bucket-index-before-resize.svg) + +Over time, virtual map grow, and the average number of stored elements in a bucket starts to +exceed a certain threshold. This is when the number of buckets needs to be expanded. + +### Expanding bucket index + +Number of buckets is always a power of 2. When bucket index is expanded, the number of buckets +is doubled to keep it a power of two. + +Let's assume the old number of buckets is `2^N`, and the new (updated) number is `2^(N + 1)`, i.e. +two times larger. For every bucket, let's store two entries in the new bucket index: +* 0 +* 1 +where `bucketID` is the lowest `N` bucket ID bits. This means, two entries in bucket index will +point to the same data location on disk. + +![Bucket index after resize](bucket-index-after-resize.svg) + +### Key lookups after index expanding + +How key/path lookups in HDHM work today: +* Get key hash code +* Take N lower bits of the hash code, this will be the bucket ID +* Look up data location for the bucket ID +* Load the bucket from disk +* Iterate over all bucket entries +* For every entry, check the hash code and key bytes. If they are equal to the key in question, +return entry value (path) to the caller, otherwise proceed to the next entry + +No changes are needed to these steps after the bucket index is expanded. + +### Bucket cleanup after index expanding + +After bucket index is expanded as described, there are two issues about it + +First, some bucket IDs are wrong. When a bucket is loaded for bucket ID `X`, it may have actual +ID `X`, or its ID may be `X` - `2^N`, assuming the index was resized from `2^N` to `2^(N + 1)`. + +Second, all buckets now contain stale entries that should be removed. All entries were +stored correctly when the number of buckets was `2^N`. After resize to `2^(N + 1)`, some entries +from bucket `X` should stay in bucket `X`, while others should be moved to bucket `X + 2^N`, +it depends on the Nth bit value of entry hash code. + +To clean up all buckets would be too expensive, as there may be too many buckets to process. +Instead, cleanup will be done lazily, when a bucket is updated. When a bucket is loaded from +disk, all its entries are scanned, entry key hash codes (lower `N + 1` bits) are compared to the +bucket ID, if they don't match, the entry is removed. + +### Example + +Virtual map size: `200`. Calculated HDHM buckets: `8` (200 / 32, rounded up), bucket index mask is +`0b0111`. Assume there is a bucket with ID `5`, it contains keys with hash codes with lower 3 bits +set to `101`. Bucket index contains 8 elements, index[5] == DL5 (bucket 5 location on disk), all +other index entries are zeroes. Let's also assume bucket 5 contains two keys, with hash codes +`0b01001101` and `0b01000101` (lower 3 bits of both are `101`, this matches the bucket ID). + +Now the number of buckets is doubled to 16, the mask is `0b1111`. Bucket index is now 16 elements, +index[5] and index[13] are both set to DL3, all other index entries are zeroes. + +When a key with hash code `0b01001101` is requested from the HDHM, it will be looked up in bucket +13 (which is `0b01001101 & 0b1111`). Bucket index for this bucket points to DL5. When a bucket is +loaded from this location, HDHM will iterate over all its entries, find an entry that corresponds +to the key (both key hash code and key bytes match the key), and return its value (path). Similarly, +a key with hash code `0b01000101` will be found using bucket 5. + +Now let's add a new key to the map, key hash code is `0b10001101`. The corresponding bucket ID is +13 (hash code & bucket mask). The bucket is loaded from disk, a new entry is added to it. At the +same time, the entry for key `0b01000101` is removed from the bucket, since its hash code combined +with the bucket mask corresponds to bucket 5, not bucket 13. Then bucket 13 with the two entries +is written to disk, and index[13] is updated to the new disk location. + +![Bucket cleanup](bucket-entries-cleanup.svg) + +### Threading and synchronization + +Bucket index is resized, this process has to be synchronized with all code that reads from or +writes to the index: +* Key reads from MerkleDb +* Flushes to MerkleDb +* Background compaction + +Key reads. During index resize, all index entries from `2^N` to `2^(N + 1)` - 1 are set first, then +the current number of buckets is updated from `N` to `2^(N + 1)`. All reads happening in parallel +will read from the first half of the index. + +Flushes. Data flush is the only time when key to path mapping is changed in MerkleDb. If bucket +index is resized on the same thread, before or after flushing, it will make sure no changes are +done to the buckets during resizing. Flushes are also the only moment when virtual leaf path range +(virtual map size) can be changed. It makes sense to trigger index resize right after a flush. + +Background compaction. During compaction, key to path store content is not changed, but bucket +index is modified as buckets are moved from one data file to another. Index resizing and compaction +must not run in parallel. Note, however, that compaction tasks are only started - if not running +already - after data flushes. So the final flow is as following: + +* Data flush to MerkleDb is initiated +* In the end of the flush, check if key to path compaction is in progress or not +* If not, resize bucket index (if needed) +* Then start compaction (if needed) +* If compaction is still in progress, skip index resize. Sooner or later the compaction task will + be done, and the index will be resized in the end of the next flush + +## Performance + +Expanding index. Current proposal is very straightforward: double index size and copy elements +`0` to `2^N - 1` to `2^N` to `2^(N + 1) - 1`. This works for all index implementations (heap, +off-heap, disk). Expanding from 1M buckets to 2M buckets (which corresponds to virtual map sizes +30-60m) takes 5-10ms. In the future, when disk-based indices are used, more efficient resize +implementation may be needed. + +Bucket updates to remove stale entries with wrong hash codes. Buckets are sanitized only on write +in the end of a flush. Reads are not affected. Real world experiments show that to have another +run through all bucket entries and removing those that are staled does not have impact on TPS +in end to end benchmarks. + +## Testing + +## Other concerns + +### State validation + +State validation tool will need to be updated, so it doesn't fail when a bucket entry with a +wrong hash code (doesn't correspond to bucket ID, with the current bucket mask) is found. Bucket +IDs may not correspond to bucket index entries, too, the tool must be able to detect that. + +### Default virtual map size hints + +Current virtual map size hints in Hedera services code are very hight. At least four tables in +TokenService have hints set to 1B. It results in 1Gb allocated just for bucket indices right +at node startup just for these tables. HintsService also has a few maps with size hints set to +very high values. + +With this new mechanism to increase the number of key buckets on the fly, there is no need to +have so large hints. Nodes can start with a low number of buckets, and new buckets will be +created as needed over time. + +Current proposal for size hints: +* TokenService.ACCOUNTS: reduce from `1_000_000_000` to `1_000_000` +* TokenService.MINTABLE_NFTS: reduce from `1_000_000_000` to `1_000_000` +* TokenService.TOKENS: reduce from `1_000_000_000` to `1_000_000` +* TokenService.TOKEN_RELS: reduce from `1_000_000_000` to `1_000_000` +* HintsService.CRS_PUBLICATIONS: reduce from `2^31` to `2^10` +* HintsService.HINTS: reduce from `2^31` to `2^10` +* HintsService.PREPROCESSING_VOTES: reduce from `2^31` to `2^10` + +Some other tables will be adjusted accordingly. These changes _will not_ affect existing nodes. +When a node is started from an existing state snapshot, all virtual map size hints are ignored, +and the number of buckets is read from the snapshot. diff --git a/platform-sdk/platform-apps/tests/MigrationTestingTool/src/main/java/com/swirlds/demo/migration/MigrationTestingToolState.java b/platform-sdk/platform-apps/tests/MigrationTestingTool/src/main/java/com/swirlds/demo/migration/MigrationTestingToolState.java index b60e26b582cf..1305ec84cf4e 100644 --- a/platform-sdk/platform-apps/tests/MigrationTestingTool/src/main/java/com/swirlds/demo/migration/MigrationTestingToolState.java +++ b/platform-sdk/platform-apps/tests/MigrationTestingTool/src/main/java/com/swirlds/demo/migration/MigrationTestingToolState.java @@ -28,8 +28,11 @@ @ConstructableIgnored public class MigrationTestingToolState extends MerkleStateRoot implements MerkleNodeState { + private static final Logger logger = LogManager.getLogger(MigrationTestingToolState.class); + private static final long INITIAL_ACCOUNTS_HINT = 1_000_000; + /** * The version history of this class. Versions that have been released must NEVER be given a different value. */ @@ -181,10 +184,7 @@ void genesisInit() { setMerkleMap(new MerkleMap<>()); final MerkleDbConfig merkleDbConfig = configuration.getConfigData(MerkleDbConfig.class); final MerkleDbTableConfig tableConfig = new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, INITIAL_ACCOUNTS_HINT, merkleDbConfig.hashesRamToDiskThreshold()); // to make it work for the multiple node in one JVM case, we need reset the default instance path every time // we create another instance of MerkleDB. MerkleDb.resetDefaultInstancePath(); diff --git a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/DataFileCollectionBench.java b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/DataFileCollectionBench.java index 5443470b2e26..28ef02dc9d3f 100644 --- a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/DataFileCollectionBench.java +++ b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/DataFileCollectionBench.java @@ -46,6 +46,7 @@ BenchmarkRecord read(long dataLocation) throws IOException { return recordData != null ? serializer.deserialize(recordData) : null; } }; + store.updateValidKeyRange(0, maxKey); final var compactor = new DataFileCompactor(dbConfig, storeName, store, index, null, null, null, null); System.out.println(); @@ -60,7 +61,7 @@ BenchmarkRecord read(long dataLocation) throws IOException { index.put(id, store.storeDataItem(record::serialize, BenchmarkRecord.getSerializedSize())); if (verify) map[(int) id] = record; } - store.endWriting(0, maxKey); + store.endWriting(); } System.out.println("Created " + numFiles + " files in " + (System.currentTimeMillis() - start) + "ms"); diff --git a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBaseBench.java b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBaseBench.java index aab2f849ab92..167563f66b9c 100644 --- a/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBaseBench.java +++ b/platform-sdk/swirlds-benchmarks/src/jmh/java/com/swirlds/benchmark/VirtualMapBaseBench.java @@ -94,11 +94,9 @@ protected VirtualMap createMap() { protected VirtualMap createEmptyMap(String label) { final MerkleDbConfig merkleDbConfig = getConfig(MerkleDbConfig.class); + // Start with a relatively low virtual map size hint and let MerkleDb resize its HDHM final MerkleDbTableConfig tableConfig = new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, maxKey / 2, merkleDbConfig.hashesRamToDiskThreshold()); MerkleDbDataSourceBuilder dataSourceBuilder = new MerkleDbDataSourceBuilder(tableConfig, configuration); return new VirtualMap<>( label, new BenchmarkKeySerializer(), new BenchmarkValueSerializer(), dataSourceBuilder, configuration); @@ -324,20 +322,20 @@ protected VirtualMap restoreMap(final String label } savedDir = nextSavedDir; } + VirtualMap virtualMap = null; if (savedDir != null) { try { logger.info("Restoring map {} from {}", label, savedDir); - final VirtualMap virtualMap = new VirtualMap<>(configuration); - try (final SerializableDataInputStream in = + virtualMap = new VirtualMap<>(configuration); + try (SerializableDataInputStream in = new SerializableDataInputStream(Files.newInputStream(savedDir.resolve(label + SERDE_SUFFIX)))) { virtualMap.deserialize(in, savedDir, virtualMap.getVersion()); } logger.info("Restored map {} from {}", label, savedDir); - return virtualMap; } catch (IOException ex) { logger.error("Error loading saved map: {}", ex.getMessage()); } } - return null; + return virtualMap; } } diff --git a/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/map/MapTest.java b/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/map/MapTest.java index 36010ad7cd84..9a0c6682f626 100644 --- a/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/map/MapTest.java +++ b/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/map/MapTest.java @@ -35,6 +35,8 @@ final class MapTest { + private static final long INITIAL_MAP_SIZE = 1_000_000; + private static final Configuration CONFIGURATION = ConfigurationBuilder.create() .withConfigDataType(VirtualMapConfig.class) .withConfigDataType(MerkleDbConfig.class) @@ -43,10 +45,7 @@ final class MapTest { .build(); private static final MerkleDbConfig MERKLE_DB_CONFIG = CONFIGURATION.getConfigData(MerkleDbConfig.class); private static final MerkleDbTableConfig TABLE_CONFIG = new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - MERKLE_DB_CONFIG.maxNumOfKeys(), - MERKLE_DB_CONFIG.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, INITIAL_MAP_SIZE, MERKLE_DB_CONFIG.hashesRamToDiskThreshold()); VirtualDataSourceBuilder createLongBuilder() { return new MerkleDbDataSourceBuilder(TABLE_CONFIG, CONFIGURATION); diff --git a/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/reconnect/VirtualMapReconnectTestBase.java b/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/reconnect/VirtualMapReconnectTestBase.java index 6a59e8e85ba9..61b3d8c7400a 100644 --- a/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/reconnect/VirtualMapReconnectTestBase.java +++ b/platform-sdk/swirlds-merkle/src/timingSensitive/java/com/swirlds/virtual/merkle/reconnect/VirtualMapReconnectTestBase.java @@ -21,7 +21,6 @@ import com.swirlds.merkledb.MerkleDb; import com.swirlds.merkledb.MerkleDbDataSourceBuilder; import com.swirlds.merkledb.MerkleDbTableConfig; -import com.swirlds.merkledb.config.MerkleDbConfig; import com.swirlds.virtual.merkle.TestKey; import com.swirlds.virtual.merkle.TestKeySerializer; import com.swirlds.virtual.merkle.TestValue; @@ -95,9 +94,7 @@ VirtualDataSourceBuilder createBuilder() throws IOException { // MerkleDb instance, so let's use a new (temp) database location for every run final Path defaultVirtualMapPath = LegacyTemporaryFileBuilder.buildTemporaryFile(CONFIGURATION); MerkleDb.setDefaultPath(defaultVirtualMapPath); - final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); - final MerkleDbTableConfig tableConfig = - new MerkleDbTableConfig((short) 1, DigestType.SHA_384, merkleDbConfig.maxNumOfKeys(), 0); + final MerkleDbTableConfig tableConfig = new MerkleDbTableConfig((short) 1, DigestType.SHA_384, 1_000_000, 0); return new MerkleDbDataSourceBuilder(tableConfig, CONFIGURATION); } diff --git a/platform-sdk/swirlds-merkledb/src/hammer/java/com/swirlds/merkledb/files/DataFileCollectionCompactionHammerTest.java b/platform-sdk/swirlds-merkledb/src/hammer/java/com/swirlds/merkledb/files/DataFileCollectionCompactionHammerTest.java index d825b8229096..d888e66244e2 100644 --- a/platform-sdk/swirlds-merkledb/src/hammer/java/com/swirlds/merkledb/files/DataFileCollectionCompactionHammerTest.java +++ b/platform-sdk/swirlds-merkledb/src/hammer/java/com/swirlds/merkledb/files/DataFileCollectionCompactionHammerTest.java @@ -82,7 +82,8 @@ void benchmark(int numFiles, int maxEntriesPerFile) throws IOException { }, 2 * Long.BYTES)); } - coll.endWriting(index.size() * 2L - 1, index.size() * 2L); + coll.updateValidKeyRange(index.size() * 2L - 1, index.size() * 2L); + coll.endWriting(); } final long start = System.currentTimeMillis(); @@ -148,7 +149,8 @@ void hammer() throws IOException, InterruptedException, ExecutionException { }, 2 * Long.BYTES)); } - coll.endWriting(index.size() * 2L - 1, index.size() * 2L); + coll.updateValidKeyRange(index.size() * 2L - 1, index.size() * 2L); + coll.endWriting(); } return null; }); diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbCompactionCoordinator.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbCompactionCoordinator.java index 38376515f563..d93269cf13b4 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbCompactionCoordinator.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbCompactionCoordinator.java @@ -156,8 +156,8 @@ public synchronized void compactIfNotRunningYet(final String key, final DataFile if (!compactionEnabled) { return; } - if (compactorsByName.containsKey(key)) { - logger.debug(MERKLE_DB.getMarker(), "Compaction for {} is already in progress", key); + if (isCompactionRunning(key)) { + logger.info(MERKLE_DB.getMarker(), "Compaction for {} is already in progress", key); return; } compactorsByName.put(key, compactor); diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbDataSource.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbDataSource.java index 15eb4f694e0e..9df0d954f1a4 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbDataSource.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbDataSource.java @@ -234,12 +234,12 @@ public MerkleDbDataSource( } saveMetadata(dbPaths); - // Max number of entities that can be stored - final long virtualSize = tableConfig.getMaxNumberOfKeys(); - // Path to hash and path to KV index capacity. Last leaf path is 2*virtualSize - 2, - // so capacity should be 2*virtualSize - 1, but only if virtualSize is greater than 1. - // To support virtual sizes 0 and 1, let's set capacity to 2*virtualSize - final long pathIndexCapacity = virtualSize * 2; + // Get the max number of keys is set in the MerkleDb config, then multiply it by + // two, since virtual path range is 2 times number of keys stored in a virtual map. + // Use it as a path to hash and path to KV index capacity. Index capacity limits + // the max size of the index, but it doesn't have anything to do with index initial + // size. If a new MerkleDb instance is created, both path indices will have size 0 + final long pathIndexCapacity = merkleDbConfig.maxNumOfKeys() * 2; final boolean forceIndexRebuilding = merkleDbConfig.indexRebuildingEnforced(); // Path to disk location index, hashes @@ -342,7 +342,7 @@ public MerkleDbDataSource( String keyToPathStoreName = tableName + "_objectkeytopath"; keyToPath = new HalfDiskHashMap( config, - tableConfig.getMaxNumberOfKeys(), + tableConfig.getInitialCapacity(), dbPaths.keyToPathDirectory, keyToPathStoreName, tableName + ":objectKeyToPath", @@ -377,10 +377,10 @@ public MerkleDbDataSource( logger.info( MERKLE_DB.getMarker(), - "Created MerkleDB [{}] with store path '{}', maxNumKeys = {}, hash RAM/disk cutoff" + " = {}", + "Created MerkleDB [{}] with store path '{}', initial capacity = {}, hash RAM/disk cutoff" + " = {}", tableName, storageDir, - tableConfig.getMaxNumberOfKeys(), + tableConfig.getInitialCapacity(), tableConfig.getHashesRamToDiskThreshold()); } @@ -869,7 +869,7 @@ public void snapshot(final Path snapshotDirectory) throws IOException, IllegalSt @Override public String toString() { return new ToStringBuilder(this) - .append("maxNumberOfKeys", tableConfig.getMaxNumberOfKeys()) + .append("initialCapacity", tableConfig.getInitialCapacity()) .append("preferDiskBasedIndexes", preferDiskBasedIndices) .append("pathToDiskLocationInternalNodes.size", pathToDiskLocationInternalNodes.size()) .append("pathToDiskLocationLeafNodes.size", pathToDiskLocationLeafNodes.size()) @@ -926,8 +926,8 @@ Path getStorageDir() { } // For testing purpose - long getMaxNumberOfKeys() { - return tableConfig.getMaxNumberOfKeys(); + long getInitialCapacity() { + return tableConfig.getInitialCapacity(); } // For testing purpose @@ -1199,9 +1199,14 @@ private void writeLeavesToPathToKeyValue( // end writing final DataFileReader pathToKeyValueReader = pathToKeyValue.endWriting(); statisticsUpdater.setFlushLeavesStoreFileSize(pathToKeyValueReader); - compactionCoordinator.compactIfNotRunningYet(DataFileCompactor.PATH_TO_KEY_VALUE, newPathToKeyValueCompactor()); final DataFileReader keyToPathReader = keyToPath.endWriting(); statisticsUpdater.setFlushLeafKeysStoreFileSize(keyToPathReader); + + if (!compactionCoordinator.isCompactionRunning(DataFileCompactor.OBJECT_KEY_TO_PATH)) { + keyToPath.resizeIfNeeded(firstLeafPath, lastLeafPath); + } + + compactionCoordinator.compactIfNotRunningYet(DataFileCompactor.PATH_TO_KEY_VALUE, newPathToKeyValueCompactor()); compactionCoordinator.compactIfNotRunningYet(DataFileCompactor.OBJECT_KEY_TO_PATH, newKeyToPathCompactor()); } diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbTableConfig.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbTableConfig.java index 621fce4bf703..92141b8a9680 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbTableConfig.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/MerkleDbTableConfig.java @@ -59,8 +59,8 @@ private static final class ClassVersion { private static final FieldDefinition FIELD_TABLECONFIG_PREFERDISKINDICES = new FieldDefinition("preferDiskIndices", FieldType.UINT32, false, true, false, 7); - private static final FieldDefinition FIELD_TABLECONFIG_MAXNUMBEROFKEYS = - new FieldDefinition("maxNumberOfKeys", FieldType.UINT64, false, true, false, 8); + private static final FieldDefinition FIELD_TABLECONFIG_INITIALCAPACITY = + new FieldDefinition("initialCapacity", FieldType.UINT64, false, true, false, 8); private static final FieldDefinition FIELD_TABLECONFIG_HASHRAMTODISKTHRESHOLD = new FieldDefinition("hashesRamToDiskThreshold", FieldType.UINT64, false, true, false, 9); @@ -106,7 +106,7 @@ private static final class ClassVersion { /** * Max number of keys that can be stored in a table. */ - private long maxNumberOfKeys = 0; + private long initialCapacity; /** * Threshold where we switch from storing internal hashes in ram to storing them on disk. If it is 0 then everything @@ -114,7 +114,7 @@ private static final class ClassVersion { * we swap from ram to disk. This allows a tree where the lower levels of the tree nodes hashes are in ram and the * upper larger less changing layers are on disk. */ - private long hashesRamToDiskThreshold = 0; + private long hashesRamToDiskThreshold; /** * Creates a new virtual table config with default values. This constructor should only be used @@ -131,15 +131,15 @@ public MerkleDbTableConfig() { * Hash version * @param hashType * Hash type - * @param maxNumberOfKeys - * Max number of keys that can be stored in a table. + * @param initialCapacity + * Initial database capacity. May be used to calculate index sizes * @param hashesRamToDiskThreshold * Threshold where we switch from storing internal hashes in ram to storing them on disk. */ public MerkleDbTableConfig( final short hashVersion, final DigestType hashType, - final long maxNumberOfKeys, + final long initialCapacity, final long hashesRamToDiskThreshold) { // Mandatory fields this.hashVersion = hashVersion; @@ -147,10 +147,10 @@ public MerkleDbTableConfig( // Optional hints, may be set explicitly using setters later. Defaults are loaded from // MerkleDb configuration - if (maxNumberOfKeys <= 0) { - throw new IllegalArgumentException("Max number of keys must be greater than 0"); + if (initialCapacity <= 0) { + throw new IllegalArgumentException("Initial capacity must be greater than 0"); } - this.maxNumberOfKeys = maxNumberOfKeys; + this.initialCapacity = initialCapacity; if (hashesRamToDiskThreshold < 0) { throw new IllegalArgumentException("Hashes RAM/disk threshold must be greater or equal to 0"); } @@ -165,7 +165,7 @@ public MerkleDbTableConfig(final ReadableSequentialData in) { // of them are protobuf default and aren't present) hashVersion = 0; hashType = DigestType.SHA_384; - maxNumberOfKeys = 0; + initialCapacity = 0; hashesRamToDiskThreshold = 0; while (in.hasRemaining()) { @@ -191,8 +191,8 @@ public MerkleDbTableConfig(final ReadableSequentialData in) { } else if (fieldNum == FIELD_TABLECONFIG_PREFERDISKINDICES.number()) { // Skip preferDiskIndices in.readVarInt(false); - } else if (fieldNum == FIELD_TABLECONFIG_MAXNUMBEROFKEYS.number()) { - maxNumberOfKeys = in.readVarLong(false); + } else if (fieldNum == FIELD_TABLECONFIG_INITIALCAPACITY.number()) { + initialCapacity = in.readVarLong(false); } else if (fieldNum == FIELD_TABLECONFIG_HASHRAMTODISKTHRESHOLD.number()) { hashesRamToDiskThreshold = in.readVarLong(false); } else { @@ -202,8 +202,8 @@ public MerkleDbTableConfig(final ReadableSequentialData in) { // Check that all mandatory fields have been loaded from the stream requireNonNull(hashType, "Null or wrong hash type"); - if (maxNumberOfKeys <= 0) { - throw new IllegalArgumentException("Missing or wrong max number of keys"); + if (initialCapacity <= 0) { + throw new IllegalArgumentException("Missing or wrong initial capacity"); } } @@ -216,10 +216,10 @@ public int pbjSizeInBytes() { } size += ProtoWriterTools.sizeOfTag(FIELD_TABLECONFIG_DIGESTTYPEID, ProtoConstants.WIRE_TYPE_VARINT_OR_ZIGZAG); size += ProtoWriterTools.sizeOfVarInt32(hashType.id()); - assert maxNumberOfKeys != 0; + assert initialCapacity != 0; size += ProtoWriterTools.sizeOfTag( - FIELD_TABLECONFIG_MAXNUMBEROFKEYS, ProtoConstants.WIRE_TYPE_VARINT_OR_ZIGZAG); - size += ProtoWriterTools.sizeOfVarInt64(maxNumberOfKeys); + FIELD_TABLECONFIG_INITIALCAPACITY, ProtoConstants.WIRE_TYPE_VARINT_OR_ZIGZAG); + size += ProtoWriterTools.sizeOfVarInt64(initialCapacity); if (hashesRamToDiskThreshold != 0) { size += ProtoWriterTools.sizeOfTag( FIELD_TABLECONFIG_HASHRAMTODISKTHRESHOLD, ProtoConstants.WIRE_TYPE_VARINT_OR_ZIGZAG); @@ -235,9 +235,9 @@ public void writeTo(final WritableSequentialData out) { } ProtoWriterTools.writeTag(out, FIELD_TABLECONFIG_DIGESTTYPEID); out.writeVarInt(hashType.id(), false); - assert maxNumberOfKeys != 0; - ProtoWriterTools.writeTag(out, FIELD_TABLECONFIG_MAXNUMBEROFKEYS); - out.writeVarLong(maxNumberOfKeys, false); + assert initialCapacity != 0; + ProtoWriterTools.writeTag(out, FIELD_TABLECONFIG_INITIALCAPACITY); + out.writeVarLong(initialCapacity, false); if (hashesRamToDiskThreshold != 0) { ProtoWriterTools.writeTag(out, FIELD_TABLECONFIG_HASHRAMTODISKTHRESHOLD); out.writeVarLong(hashesRamToDiskThreshold, false); @@ -287,13 +287,13 @@ public ValueSerializer getValueSerializer() { } /** - * Max number of keys that can be stored in the table. + * Initial database capacity. * * @return - * Max number of keys + * Initial database capacity */ - public long getMaxNumberOfKeys() { - return maxNumberOfKeys; + public long getInitialCapacity() { + return initialCapacity; } /** @@ -329,7 +329,7 @@ public int getVersion() { @Override public void serialize(final SerializableDataOutputStream out) throws IOException { out.writeBoolean(false); // prefer disk indices - out.writeLong(maxNumberOfKeys); + out.writeLong(initialCapacity); out.writeLong(hashesRamToDiskThreshold); out.writeShort(hashVersion); out.writeInt(hashType.id()); @@ -345,7 +345,7 @@ public void serialize(final SerializableDataOutputStream out) throws IOException @Override public void deserialize(final SerializableDataInputStream in, final int version) throws IOException { in.readBoolean(); // prefer disk indices - maxNumberOfKeys = in.readLong(); + initialCapacity = in.readLong(); hashesRamToDiskThreshold = in.readLong(); hashVersion = in.readShort(); hashType = DigestType.valueOf(in.readInt()); @@ -361,7 +361,7 @@ public void deserialize(final SerializableDataInputStream in, final int version) * @return Table config copy */ public MerkleDbTableConfig copy() { - return new MerkleDbTableConfig(hashVersion, hashType, maxNumberOfKeys, hashesRamToDiskThreshold); + return new MerkleDbTableConfig(hashVersion, hashType, initialCapacity, hashesRamToDiskThreshold); } /** @@ -369,7 +369,7 @@ public MerkleDbTableConfig copy() { */ @Override public int hashCode() { - return Objects.hash(hashVersion, hashType, maxNumberOfKeys, hashesRamToDiskThreshold); + return Objects.hash(hashVersion, hashType, initialCapacity, hashesRamToDiskThreshold); } /** @@ -380,7 +380,7 @@ public boolean equals(final Object o) { if (!(o instanceof MerkleDbTableConfig other)) { return false; } - return (maxNumberOfKeys == other.maxNumberOfKeys) + return (initialCapacity == other.initialCapacity) && (hashesRamToDiskThreshold == other.hashesRamToDiskThreshold) && (hashVersion == other.hashVersion) && Objects.equals(hashType, other.hashType); diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/config/MerkleDbConfig.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/config/MerkleDbConfig.java index 7139386317a7..1e5b84af5a5f 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/config/MerkleDbConfig.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/config/MerkleDbConfig.java @@ -16,18 +16,14 @@ * Instance-wide config for {@code MerkleDbDataSource}. * * @param maxNumOfKeys - * Get the maximum number of unique keys we expect to be stored in this database. This is used for - * calculating in memory index sizes. IMPORTANT: This can only be set before a new database is created, changing - * on an existing database will break it. - * @param size - * Reserved for future use. + * The maximum number of unique keys to be stored in a database. * @param hashesRamToDiskThreshold - * Get threshold where we switch from storing node hashes in ram to - * storing them on disk. If it is 0 then everything is on disk, if it is Long.MAX_VALUE then everything is in ram. - * Any value in the middle is the path value at - * which we swap from ram to disk. This allows a tree where the lower levels of the tree nodes hashes are in ram - * and the upper larger less changing layers are on disk. IMPORTANT: This can only be set before a new database is - * created, changing on an existing database will break it. + * Get threshold where we switch from storing node hashes in ram to + * storing them on disk. If it is 0 then everything is on disk, if it is Long.MAX_VALUE then everything is in ram. + * Any value in the middle is the path value at + * which we swap from ram to disk. This allows a tree where the lower levels of the tree nodes hashes are in ram + * and the upper larger less changing layers are on disk. IMPORTANT: This can only be set before a new database is + * created, changing on an existing database will break it. * @param hashStoreRamBufferSize * Number of hashes to store in a single buffer in HashListByteBuffer. * @param hashStoreRamOffHeapBuffers @@ -37,19 +33,23 @@ * @param longListReservedBufferSize * Length of a reserved buffer in long lists. Value in bytes. * @param minNumberOfFilesInCompaction - * The minimum number of files before we do a compaction. If there are less than this number then it is - * acceptable to not do a compaction. + * The minimum number of files before we do a compaction. If there are less than this number then it is + * acceptable to not do a compaction. * @param iteratorInputBufferBytes * Size of buffer used by data file iterators, in bytes. * @param reconnectKeyLeakMitigationEnabled - * There currently exists a bug when a virtual map is reconnected that can - * cause some deleted keys to leak into the datasource. If this method returns true then a mitigation strategy is - * used when a leaked key is encountered, which hides the problem from the perspective of the application. This - * setting exists so that we can test behavior with and without this mitigation enabled. This mitigation should - * always be enabled in production environments. + * There currently exists a bug when a virtual map is reconnected that can + * cause some deleted keys to leak into the datasource. If this method returns true then a mitigation strategy is + * used when a leaked key is encountered, which hides the problem from the perspective of the application. This + * setting exists so that we can test behavior with and without this mitigation enabled. This mitigation should + * always be enabled in production environments. * @param indexRebuildingEnforced - * Configuration used to avoid reading stored indexes from a saved state and enforce rebuilding those indexes from - * data files. + * Configuration used to avoid reading stored indexes from a saved state and enforce rebuilding those indexes from + * data files. + * @param goodAverageBucketEntryCount + * Target average number of entries in HalfDiskHashMap buckets. This number is used to calculate the number + * of buckets to allocate based on projected virtual map size, and also to check if it's time to double the + * number of HalfDiskHashMap buckets. * @param tablesToRepairHdhm * Comma-delimited list of data source names, may be empty. When a MerkleDb data source with a name from the * list is loaded from a snapshot, its key to path map will be rebuilt from path to KV data files. Note that @@ -70,8 +70,7 @@ */ @ConfigData("merkleDb") public record MerkleDbConfig( - @Positive @ConfigProperty(defaultValue = "500000000") long maxNumOfKeys, - @Positive @ConfigProperty(defaultValue = "" + 4_000_000_000L) long size, + @Positive @ConfigProperty(defaultValue = "4000000000") long maxNumOfKeys, @Min(0) @ConfigProperty(defaultValue = "8388608") long hashesRamToDiskThreshold, @Positive @ConfigProperty(defaultValue = "1000000") int hashStoreRamBufferSize, @ConfigProperty(defaultValue = "true") boolean hashStoreRamOffHeapBuffers, @@ -85,6 +84,7 @@ public record MerkleDbConfig( @Positive @ConfigProperty(defaultValue = "16777216") int iteratorInputBufferBytes, @ConfigProperty(defaultValue = "false") boolean reconnectKeyLeakMitigationEnabled, @ConfigProperty(defaultValue = "false") boolean indexRebuildingEnforced, + @ConfigProperty(defaultValue = "32") int goodAverageBucketEntryCount, @ConfigProperty(defaultValue = "") String tablesToRepairHdhm, @ConfigProperty(defaultValue = "75.0") double percentHalfDiskHashMapFlushThreads, @ConfigProperty(defaultValue = "-1") int numHalfDiskHashMapFlushThreads, diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCollection.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCollection.java index 055f3ff7e201..aa1d00324418 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCollection.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCollection.java @@ -119,7 +119,7 @@ public class DataFileCollection implements FileStatisticAware, Snapshotable { /** * The list of current files in this data file collection. The files are added to this list - * during flushes in {@link #endWriting(long, long)}, after the file is completely written. They + * during flushes in {@link #endWriting()}, after the file is completely written. They * are also added during compaction in {@link DataFileCompactor#compactFiles(CASableLongIndex, List, int)}, even * before compaction is complete. In the end of compaction, all the compacted files are removed * from this list. @@ -387,15 +387,10 @@ public long storeDataItem(final Consumer dataItemWriter, final int * as completed (fully written, read only, and ready for compaction), so any indexes or other data structures that * are in-sync with the file content should be updated before calling this method. * - * @param minimumValidKey The minimum valid data key at this point in time, can be used for - * cleaning out old data - * @param maximumValidKey The maximum valid data key at this point in time, can be used for - * cleaning out old data * @return data file reader for the file written * @throws IOException If there was a problem closing the data file */ - public DataFileReader endWriting(final long minimumValidKey, final long maximumValidKey) throws IOException { - validKeyRange = new KeyRange(minimumValidKey, maximumValidKey); + public DataFileReader endWriting() throws IOException { final DataFileWriter dataWriter = currentDataFileWriter.getAndSet(null); if (dataWriter == null) { throw new IOException("Tried to end writing when we never started writing."); @@ -411,6 +406,18 @@ public DataFileReader endWriting(final long minimumValidKey, final long maximumV return dataReader; } + /** + * Sets the current key range for this file collection. + * + * @param minimumValidKey The minimum valid data key at this point in time, can be used for + * cleaning out old data + * @param maximumValidKey The maximum valid data key at this point in time, can be used for + * cleaning out old data + */ + public void updateValidKeyRange(final long minimumValidKey, final long maximumValidKey) { + validKeyRange = new KeyRange(minimumValidKey, maximumValidKey); + } + /** * Gets the data file reader for a given data location. This method checks that a file with * the specified index exists in this file collection, and that the file is open. Note, diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCommon.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCommon.java index 1221db4e324e..41bab119c137 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCommon.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/DataFileCommon.java @@ -132,6 +132,9 @@ public static long dataLocation(final int fileIndex, final long byteOffset) { * @return String with split file and offset */ public static String dataLocationToString(final long dataLocation) { + if (dataLocation <= 0) { + return String.valueOf(dataLocation); + } return "{" + fileIndexFromDataLocation(dataLocation) + "," + byteOffsetFromDataLocation(dataLocation) + "}"; } diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/MemoryIndexDiskKeyValueStore.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/MemoryIndexDiskKeyValueStore.java index 9a0f9fa8e931..9db9b7108d18 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/MemoryIndexDiskKeyValueStore.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/MemoryIndexDiskKeyValueStore.java @@ -84,6 +84,7 @@ public MemoryIndexDiskKeyValueStore( public void updateValidKeyRange(final long min, final long max) { // By calling `updateMinValidIndex` we compact the index if it's applicable. index.updateValidRange(min, max); + // Data file collection key range is updated in endWriting() } /** @@ -121,7 +122,8 @@ public void put(final long key, final Consumer dataItemWriter, fin public DataFileReader endWriting() throws IOException { final long currentMinValidKey = index.getMinValidIndex(); final long currentMaxValidKey = index.getMaxValidIndex(); - final DataFileReader dataFileReader = fileCollection.endWriting(currentMinValidKey, currentMaxValidKey); + fileCollection.updateValidKeyRange(currentMinValidKey, currentMaxValidKey); + final DataFileReader dataFileReader = fileCollection.endWriting(); logger.info( MERKLE_DB.getMarker(), "{} Ended writing, newFile={}, numOfFiles={}, minimumValidKey={}, maximumValidKey={}", diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java index b5758c2d1044..b335136eadd6 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java @@ -82,7 +82,7 @@ public sealed class Bucket implements Closeable permits ParsedBucket { */ protected final ReusableBucketPool bucketPool; - private BufferedData bucketData; + private volatile BufferedData bucketData; private volatile long bucketIndexFieldOffset = 0; @@ -145,7 +145,7 @@ public int getBucketIndex() { } /** Set the index for this bucket */ - public void setBucketIndex(int index) { + public void setBucketIndex(final int index) { bucketData.position(bucketIndexFieldOffset); ProtoWriterTools.writeTag(bucketData, FIELD_BUCKET_INDEX); bucketData.writeInt(index); @@ -290,6 +290,7 @@ public void readFrom(final ReadableSequentialData in) { in.readBytes(bucketData); bucketData.flip(); + bucketIndexFieldOffset = 0; entryCount = 0; while (bucketData.hasRemaining()) { final long fieldOffset = bucketData.position(); @@ -315,14 +316,107 @@ public void readFrom(final ReadableSequentialData in) { throw new IllegalArgumentException("Unknown bucket field: " + fieldNum); } } + checkLargestBucket(entryCount); } + private static int readBucketEntryHashCode(final ReadableSequentialData in) { + while (in.hasRemaining()) { + final int tag = in.readVarInt(false); + final int fieldNum = tag >> TAG_FIELD_OFFSET; + if (fieldNum == FIELD_BUCKETENTRY_HASHCODE.number()) { + return in.readInt(); + } else if (fieldNum == FIELD_BUCKETENTRY_VALUE.number()) { + in.readLong(); + } else if (fieldNum == FIELD_BUCKETENTRY_KEYBYTES.number()) { + final int bytesSize = in.readVarInt(false); + in.skip(bytesSize); + } else { + throw new IllegalArgumentException("Unknown bucket entry field: " + fieldNum); + } + } + throw new IllegalArgumentException("No bucket entry hash code found"); + } + public void writeTo(final WritableSequentialData out) { bucketData.resetPosition(); out.writeBytes(bucketData); } + /** + * First, this method updates bucket index of the current bucket to the given value. Second, + * it iterates over all bucket entries and runs a check against entry hash codes. If the lower + * specified number of bits of entry hash code are equal to the bucket index, the entry is + * retained in the bucket, otherwise it's removed. + * + *

This method is used by {@link HalfDiskHashMap} after resize. During resize, no bucket + * data is copied anywhere, but only bucket index entries are updated. It leads to some buckets + * to have wrong numbers (some lower bits match, but higher bits are different). Besides that, + * some bucket entries may not be valid. For example, an entry may be valid for a bucket with + * mask 0b0111, but when the mask becomes 0b1111 as a result of map resize, the entry may now + * belong to a bucket with a different number. This method removes all such entries. + * + * @param expectedIndex Bucket index to set to this bucket + * @param expectedMaskBits Bucket mask bits to validate all bucket entries against + */ + public void sanitize(final int expectedIndex, final int expectedMaskBits) { + final int expectedMask = (1 << expectedMaskBits) - 1; + bucketData.resetPosition(); + long srcIndex = 0; + long dstIndex = 0; + while (bucketData.hasRemaining()) { + final long fieldOffset = bucketData.position(); + final int tag = bucketData.readVarInt(false); + final int fieldNum = tag >> TAG_FIELD_OFFSET; + if (fieldNum == FIELD_BUCKET_INDEX.number()) { + bucketData.writeInt(expectedIndex); + final long fieldLenWithTag = bucketData.position() - fieldOffset; + srcIndex += fieldLenWithTag; + dstIndex += fieldLenWithTag; + } else if (fieldNum == FIELD_BUCKET_ENTRIES.number()) { + final int entrySize = bucketData.readVarInt(false); + final long nextEntryOffset = bucketData.position() + entrySize; + final long entryLenWithTag = nextEntryOffset - fieldOffset; + final long oldLimit = bucketData.limit(); + bucketData.limit(nextEntryOffset); + final int entryHashCode = readBucketEntryHashCode(bucketData); + bucketData.limit(oldLimit); + if ((entryHashCode & expectedMask) == expectedIndex) { + copyBucketDataBytes(srcIndex, dstIndex, entryLenWithTag); + dstIndex += entryLenWithTag; + } + srcIndex += entryLenWithTag; + bucketData.position(nextEntryOffset); + } + } + bucketData.position(0); + bucketData.limit(dstIndex); + } + + /** + * Copies len {@link #bucketData} bytes from src offset to dst offset. + * + *

If src and dst offsets are equal, this method is a no-op. This method makes no + * checks against the length and the offsets, assuming they are within bucket data + * buffer limits. + */ + private void copyBucketDataBytes(final long src, final long dst, final long len) { + if (src == dst) { + return; + } + final long limit = bucketData.limit(); + final long pos = bucketData.position(); + final BufferedData srcBuf = bucketData.slice(src, len); + try { + bucketData.position(dst); + bucketData.limit(dst + len); + bucketData.writeBytes(srcBuf); + } finally { + bucketData.limit(limit); + bucketData.position(pos); + } + } + // ================================================================================================================= // Private API diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/BucketSerializer.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/BucketSerializer.java deleted file mode 100644 index 11d2a37b2336..000000000000 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/BucketSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package com.swirlds.merkledb.files.hashmap; - -import com.hedera.pbj.runtime.io.ReadableSequentialData; -import com.hedera.pbj.runtime.io.WritableSequentialData; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Serializer for writing buckets into a DataFile. - */ -public class BucketSerializer { - - /** Bucket pool used by this serializer */ - private final ReusableBucketPool reusableBucketPool; - - public BucketSerializer() { - reusableBucketPool = new ReusableBucketPool(Bucket::new); - } - - /** - * Reusable bucket pool for this bucket serializer. - * - * @return This serializer's reusable bucket pool. - */ - public ReusableBucketPool getBucketPool() { - return reusableBucketPool; - } - - public int getSerializedSize(final Bucket bucket) { - return bucket.sizeInBytes(); - } - - public void serialize(@NonNull final Bucket bucket, @NonNull final WritableSequentialData out) { - bucket.writeTo(out); - } - - public Bucket deserialize(@NonNull final ReadableSequentialData in) { - final Bucket bucket = reusableBucketPool.getBucket(); - bucket.readFrom(in); - return bucket; - } -} diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMap.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMap.java index 68ddfeab7a9b..9574bacdb5fc 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMap.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMap.java @@ -18,6 +18,7 @@ import com.swirlds.merkledb.config.MerkleDbConfig; import com.swirlds.merkledb.files.DataFileCollection; import com.swirlds.merkledb.files.DataFileCollection.LoadedDataCallback; +import com.swirlds.merkledb.files.DataFileCommon; import com.swirlds.merkledb.files.DataFileReader; import com.swirlds.merkledb.files.MemoryIndexDiskKeyValueStore; import com.swirlds.virtualmap.datasource.VirtualLeafBytes; @@ -70,19 +71,18 @@ public class HalfDiskHashMap implements AutoCloseable, Snapshotable, FileStatist protected static final long INVALID_VALUE = Long.MIN_VALUE; /** - * This is the average number of entries per bucket we aim for when filled to mapSize. It is a - * heuristic used in calculation for how many buckets to create. The larger this number the - * slower lookups will be but the more even distribution of entries across buckets will be. So - * it is a matter of balance. + * The average number of entries per bucket we aim for. When map size grows and + * starts to exceed the number of buckets times this average number of entries, the + * map is resized by doubling the number of buckets. */ - private static final long GOOD_AVERAGE_BUCKET_ENTRY_COUNT = 32; + private final int goodAverageBucketEntryCount; /** The limit on the number of concurrent read tasks in {@code endWriting()} */ private static final int MAX_IN_FLIGHT = 1024; /** Platform configuration */ @NonNull - private final MerkleDbConfig merkleDbConfig; + private final Configuration config; /** * Long list used for mapping bucketIndex(index into list) to disk location for latest copy of @@ -96,7 +96,9 @@ public class HalfDiskHashMap implements AutoCloseable, Snapshotable, FileStatist * This is the next power of 2 bigger than minimumBuckets. It needs to be a power of two, so * that we can optimize and avoid the cost of doing a % to find the bucket index from hash code. */ - private final int numOfBuckets; + private final AtomicInteger numOfBuckets = new AtomicInteger(); + + private final AtomicInteger bucketMaskBits = new AtomicInteger(0); /** The name to use for the files prefix on disk */ private final String storeName; @@ -117,7 +119,7 @@ public class HalfDiskHashMap implements AutoCloseable, Snapshotable, FileStatist private final AtomicReference currentSubmitTask = new AtomicReference<>(); /** Number of buckets updated during flush */ - private final AtomicInteger numBuckets = new AtomicInteger(); + private final AtomicInteger updatedBucketsCount = new AtomicInteger(); /** * Number of bucket tasks that can be scheduled at the moment, i.e. MAX_IN_FLIGHT minus @@ -134,7 +136,7 @@ public class HalfDiskHashMap implements AutoCloseable, Snapshotable, FileStatist /** * Number of "store bucket" tasks created so far in the current flush. This counter is - * compared against {@link #numBuckets} to manage the first and the last "store bucket" + * compared against {@link #updatedBucketsCount} to manage the first and the last "store bucket" * task dependencies */ private final AtomicInteger storeBucketTasksCreated = new AtomicInteger(); @@ -159,14 +161,14 @@ public class HalfDiskHashMap implements AutoCloseable, Snapshotable, FileStatist *
* FUTURE WORK: it can be moved to MerkleDb. */ - private static ForkJoinPool getFlushingPool(final @NonNull MerkleDbConfig merkleDbConfig) { - requireNonNull(merkleDbConfig); - + private static ForkJoinPool getFlushingPool(final @NonNull Configuration config) { + requireNonNull(config); ForkJoinPool pool = flushingPool; if (pool == null) { synchronized (HalfDiskHashMap.class) { pool = flushingPool; if (pool == null) { + final MerkleDbConfig merkleDbConfig = config.getConfigData(MerkleDbConfig.class); final int hashingThreadCount = merkleDbConfig.getNumHalfDiskHashMapFlushThreads(); pool = new ForkJoinPool(hashingThreadCount); flushingPool = pool; @@ -180,8 +182,10 @@ private static ForkJoinPool getFlushingPool(final @NonNull MerkleDbConfig merkle * Construct a new HalfDiskHashMap * * @param configuration Platform configuration. - * @param mapSize The maximum map number of entries. This should be more than big enough to - * avoid too many key collisions. + * @param initialCapacity Initial map capacity. This should be more than big enough to avoid too + * many key collisions. This capacity is used to calculate the initial number + * of key buckets to store key to path entries. This number of buckets will + * then grow over time as needed * @param storeDir The directory to use for storing data files. * @param storeName The name for the data store, this allows more than one data store in a * single directory. @@ -196,14 +200,19 @@ private static ForkJoinPool getFlushingPool(final @NonNull MerkleDbConfig merkle */ public HalfDiskHashMap( final @NonNull Configuration configuration, - final long mapSize, + final long initialCapacity, final Path storeDir, final String storeName, final String legacyStoreName, final boolean preferDiskBasedIndex) throws IOException { - requireNonNull(configuration); - this.merkleDbConfig = configuration.getConfigData(MerkleDbConfig.class); + this.config = requireNonNull(configuration); + final MerkleDbConfig merkleDbConfig = this.config.getConfigData(MerkleDbConfig.class); + this.goodAverageBucketEntryCount = merkleDbConfig.goodAverageBucketEntryCount(); + // Max number of keys is limited by merkleDbConfig.maxNumberOfKeys. Number of buckets is, + // on average, GOOD_AVERAGE_BUCKET_ENTRY_COUNT times smaller than the number of keys. To + // be on the safe side, double that amount and use as a hard limit for bucket index size + final long bucketIndexCapacity = merkleDbConfig.maxNumOfKeys() * 2 / goodAverageBucketEntryCount; this.storeName = storeName; Path indexFile = storeDir.resolve(storeName + BUCKET_INDEX_FILENAME_SUFFIX); // create bucket pool @@ -230,7 +239,7 @@ public HalfDiskHashMap( + "]."); } metaIn.readInt(); // backwards compatibility, was: minimumBuckets - numOfBuckets = metaIn.readInt(); + setNumberOfBuckets(metaIn.readInt()); } if (loadedLegacyMetadata) { Files.delete(metaDataFile); @@ -248,14 +257,14 @@ public HalfDiskHashMap( final boolean forceIndexRebuilding = merkleDbConfig.indexRebuildingEnforced(); if (Files.exists(indexFile) && !forceIndexRebuilding) { bucketIndexToBucketLocation = preferDiskBasedIndex - ? new LongListDisk(indexFile, numOfBuckets, configuration) - : new LongListOffHeap(indexFile, numOfBuckets, configuration); + ? new LongListDisk(indexFile, bucketIndexCapacity, configuration) + : new LongListOffHeap(indexFile, bucketIndexCapacity, configuration); loadedDataCallback = null; } else { // create new index and setup call back to rebuild bucketIndexToBucketLocation = preferDiskBasedIndex - ? new LongListDisk(numOfBuckets, configuration) - : new LongListOffHeap(numOfBuckets, configuration); + ? new LongListDisk(bucketIndexCapacity, configuration) + : new LongListOffHeap(bucketIndexCapacity, configuration); loadedDataCallback = (dataLocation, bucketData) -> { final Bucket bucket = bucketPool.getBucket(); bucket.readFrom(bucketData); @@ -266,14 +275,14 @@ public HalfDiskHashMap( // create store dir Files.createDirectories(storeDir); // calculate number of entries we can store in a disk page - final int minimumBuckets = (int) (mapSize / GOOD_AVERAGE_BUCKET_ENTRY_COUNT); + final int minimumBuckets = (int) (initialCapacity / goodAverageBucketEntryCount); // numOfBuckets is the nearest power of two greater than minimumBuckets with a min of 2 - numOfBuckets = Math.max(Integer.highestOneBit(minimumBuckets) * 2, 2); + setNumberOfBuckets(Math.max(Integer.highestOneBit(minimumBuckets) * 2, 2)); // create new index bucketIndexToBucketLocation = preferDiskBasedIndex - ? new LongListDisk(numOfBuckets, configuration) - : new LongListOffHeap(numOfBuckets, configuration); - // we are new so no need for a loadedDataCallback + ? new LongListDisk(bucketIndexCapacity, configuration) + : new LongListOffHeap(bucketIndexCapacity, configuration); + // we are new, so no need for a loadedDataCallback loadedDataCallback = null; // write metadata writeMetadata(storeDir); @@ -284,11 +293,12 @@ public HalfDiskHashMap( minimumBuckets, numOfBuckets); } - bucketIndexToBucketLocation.updateValidRange(0, numOfBuckets - 1); + bucketIndexToBucketLocation.updateValidRange(0, numOfBuckets.get() - 1); // create file collection fileCollection = new DataFileCollection( // Need: propagate MerkleDb merkleDbConfig from the database merkleDbConfig, storeDir, storeName, legacyStoreName, loadedDataCallback); + fileCollection.updateValidKeyRange(0, numOfBuckets.get() - 1); } private void writeMetadata(final Path dir) throws IOException { @@ -296,7 +306,7 @@ private void writeMetadata(final Path dir) throws IOException { new DataOutputStream(Files.newOutputStream(dir.resolve(storeName + METADATA_FILENAME_SUFFIX)))) { metaOut.writeInt(METADATA_FILE_FORMAT_VERSION); metaOut.writeInt(0); // backwards compatibility, was: minimumBuckets - metaOut.writeInt(numOfBuckets); + metaOut.writeInt(numOfBuckets.get()); metaOut.flush(); } } @@ -324,34 +334,43 @@ public void repair(final long firstLeafPath, final long lastLeafPath, final Memo // If no stale bucket entries are found, no need to create a new bucket data file final AtomicBoolean newDataFile = new AtomicBoolean(false); final AtomicLong liveEntries = new AtomicLong(0); - for (int i = 0; i < numOfBuckets; i++) { + final int bucketCount = numOfBuckets.get(); + final int bucketMask = (1 << bucketMaskBits.get()) - 1; + final LongList bucketIndex = bucketIndexToBucketLocation; + for (int i = 0; i < bucketCount; i++) { final long bucketId = i; - final long bucketDataLocation = bucketIndexToBucketLocation.get(bucketId); + final long bucketDataLocation = bucketIndex.get(bucketId); if (bucketDataLocation <= 0) { continue; } - final BufferedData bucketData = - fileCollection.readDataItemUsingIndex(bucketIndexToBucketLocation, bucketId); + final BufferedData bucketData = fileCollection.readDataItemUsingIndex(bucketIndex, bucketId); if (bucketData == null) { logger.warn("Delete bucket (not found): {}, dataLocation={}", bucketId, bucketDataLocation); - bucketIndexToBucketLocation.remove(bucketId); + bucketIndex.remove(bucketId); continue; } - try (final ParsedBucket bucket = new ParsedBucket()) { + try (ParsedBucket bucket = new ParsedBucket()) { bucket.readFrom(bucketData); - if (bucket.getBucketIndex() != bucketId) { + final long loadedBucketId = bucket.getBucketIndex(); + // Check bucket index. It should match bucketId, unless it's an old bucket created before + // HDHM was resized + if ((loadedBucketId & bucketId) != loadedBucketId) { logger.warn(MERKLE_DB.getMarker(), "Delete bucket (stale): {}", bucketId); - bucketIndexToBucketLocation.remove(bucketId); + bucketIndex.remove(bucketId); continue; } bucket.forEachEntry(entry -> { final Bytes keyBytes = entry.getKeyBytes(); final long path = entry.getValue(); + final int hashCode = entry.getHashCode(); try { boolean removeKey = true; if ((path < firstLeafPath) || (path > lastLeafPath)) { logger.warn( MERKLE_DB.getMarker(), "Delete key (path range): key={}, path={}", keyBytes, path); + } else if ((hashCode & loadedBucketId) != loadedBucketId) { + logger.warn( + MERKLE_DB.getMarker(), "Delete key (hash code): key={}, path={}", keyBytes, path); } else { final BufferedData recordBytes = store.get(path); if (recordBytes == null) { @@ -374,7 +393,7 @@ public void repair(final long firstLeafPath, final long lastLeafPath, final Memo startWriting(); } delete(keyBytes, entry.getHashCode()); - } else { + } else if ((hashCode & bucketMask) == bucketId) { liveEntries.incrementAndGet(); } } catch (final Exception e) { @@ -544,7 +563,7 @@ public void deleteIfEqual(final Bytes keyBytes, final int keyHashCode, final lon */ private void resetEndWriting(final ForkJoinPool pool, final int size) { exceptionOccurred.set(null); - numBuckets.set(size); + updatedBucketsCount.set(size); bucketPermits.set(MAX_IN_FLIGHT); lastStoreTask.set(null); storeBucketTasksCreated.set(0); @@ -575,7 +594,7 @@ public DataFileReader endWriting() throws IOException { final Iterator> it = oneTransactionsData.keyValuesView().iterator(); fileCollection.startWriting(); - final ForkJoinPool pool = getFlushingPool(merkleDbConfig); + final ForkJoinPool pool = getFlushingPool(config); resetEndWriting(pool, size); // Create a task to submit bucket processing tasks. This initial submit task // is scheduled to run right away. Subsequent submit tasks will be run only @@ -591,7 +610,15 @@ public DataFileReader endWriting() throws IOException { throw new IOException(exceptionOccurred.get()); } // close files session - dataFileReader = fileCollection.endWriting(0, numOfBuckets); + dataFileReader = fileCollection.endWriting(); + logger.info( + MERKLE_DB.getMarker(), + "Finished writing to {}, newFile={}, numOfFiles={}, minimumValidKey={}, maximumValidKey={}", + storeName, + dataFileReader.getIndex(), + fileCollection.getNumOfFiles(), + 0, + numOfBuckets.get() - 1); } else { dataFileReader = null; } @@ -687,7 +714,7 @@ private void createAndScheduleStoreTask(final Bucket bucket) { // The first task: no dependency on the prev task, can be executed rightaway storeTask.send(); } - if (storeBucketTasksCreated.incrementAndGet() == numBuckets.get()) { + if (storeBucketTasksCreated.incrementAndGet() == updatedBucketsCount.get()) { // The last task: no dependency on the next task, can be executed as soon as // its prev task is complete, no need to wait until the next task dependency // is set @@ -706,7 +733,7 @@ protected boolean onExecute() throws IOException { } else { // Read from bytes bucket.readFrom(bucketData); - if (bucketIndex != bucket.getBucketIndex()) { + if ((bucket.getBucketIndex() & bucketIndex) != bucket.getBucketIndex()) { logger.error( MERKLE_DB.getMarker(), "Bucket index integrity check " + bucketIndex + " != " + bucket.getBucketIndex()); @@ -718,8 +745,9 @@ may be different from the expected one. In this case, we clear the bucket (as it anyway) and set the correct index. */ bucket.clear(); - bucket.setBucketIndex(bucketIndex); } + // Clear old bucket entries with wrong hash codes + bucket.sanitize(bucketIndex, bucketMaskBits.get()); } // Apply all updates keyUpdates.forEachKeyValue(bucket::putValue); @@ -730,7 +758,12 @@ may be different from the expected one. In this case, we clear the bucket (as it @Override protected void onException(final Throwable t) { - logger.error(MERKLE_DB.getMarker(), "Failed to read / update bucket " + bucketIndex, t); + logger.error( + MERKLE_DB.getMarker(), + "Failed to read / update bucket {}, location {}", + bucketIndex, + DataFileCommon.dataLocationToString(bucketIndexToBucketLocation.get(bucketIndex)), + t); exceptionOccurred.set(t); // Make sure the writing thread is resumed notifyTaskRef.get().completeExceptionally(t); @@ -837,7 +870,7 @@ public long get(final Bytes keyBytes, final int keyHashCode, final long notFound throw new IllegalArgumentException("Can not get a null key"); } final int bucketIndex = computeBucketIndex(keyHashCode); - try (final Bucket bucket = readBucket(bucketIndex)) { + try (Bucket bucket = readBucket(bucketIndex)) { if (bucket != null) { return bucket.findValue(keyHashCode, keyBytes, notFoundValue); } @@ -855,6 +888,42 @@ private Bucket readBucket(final int bucketIndex) throws IOException { return bucket; } + // -- Resize -- + + /** + * Check if this map should be resized, given the new virtual map size. If the new map size + * exceeds 80% of the current number of buckets times {@link #goodAverageBucketEntryCount}, + * the map is resized by doubling the number of buckets. + * + * @param firstLeafPath The first leaf virtual path + * @param lastLeafPath The last leaf virtual path + */ + public void resizeIfNeeded(final long firstLeafPath, final long lastLeafPath) { + final long currentSize = lastLeafPath - firstLeafPath + 1; + if (currentSize / numOfBuckets.get() * 100 <= goodAverageBucketEntryCount * 70L) { + // No need to resize yet + return; + } + + final int oldSize = numOfBuckets.get(); + final int newSize = oldSize * 2; + logger.info(MERKLE_DB.getMarker(), "Resize HDHM {} to {} buckets", storeName, newSize); + + bucketIndexToBucketLocation.updateValidRange(0, newSize - 1); + // This straightforward loop works fast enough for now. If in the future it needs to be + // even faster, let's consider copying index batches and/or parallel index updates + for (int i = 0; i < oldSize; i++) { + final long value = bucketIndexToBucketLocation.get(i); + if (value != DataFileCommon.NON_EXISTENT_DATA_LOCATION) { + bucketIndexToBucketLocation.put(i + oldSize, value); + } + } + fileCollection.updateValidKeyRange(0, newSize - 1); + + setNumberOfBuckets(newSize); + logger.info(MERKLE_DB.getMarker(), "Resize HDHM {} to {} buckets done", storeName, newSize); + } + // ================================================================================================================= // Debugging Print API @@ -865,10 +934,10 @@ public void printStats() { """ HalfDiskHashMap Stats { numOfBuckets = {} - GOOD_AVERAGE_BUCKET_ENTRY_COUNT = {} + goodAverageBucketEntryCount = {} }""", numOfBuckets, - GOOD_AVERAGE_BUCKET_ENTRY_COUNT); + goodAverageBucketEntryCount); } public DataFileCollection getFileCollection() { @@ -882,6 +951,20 @@ public CASableLongIndex getBucketIndexToBucketLocation() { // ================================================================================================================= // Private API + /** + * Updates the number of buckets and bucket mask bits. The new value must be a power of 2. + */ + private void setNumberOfBuckets(final int newValue) { + numOfBuckets.set(newValue); + final int trailingZeroes = Integer.numberOfTrailingZeros(newValue); + bucketMaskBits.set(trailingZeroes); + } + + // For testing purposes + int getNumOfBuckets() { + return numOfBuckets.get(); + } + /** * Computes which bucket a key with the given hash falls. Depends on the fact the numOfBuckets * is a power of two. Based on same calculation that is used in java HashMap. @@ -890,6 +973,6 @@ public CASableLongIndex getBucketIndexToBucketLocation() { * @return the index of the bucket that key falls in */ private int computeBucketIndex(final int keyHash) { - return (numOfBuckets - 1) & keyHash; + return (numOfBuckets.get() - 1) & keyHash; } } diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java index ca2b11a8a305..a8dbd61ee313 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java @@ -223,12 +223,12 @@ public String toString() { } /** - * A single entry in a bucket, which contains key hash code, value (usually, path), and - * full serialized key. A bucket may contain multiple such entries. + * A single entry in a bucket, which contains key hash code, value (usually, path), and full + * serialized key. A bucket may contain multiple such entries. * *

This class would be a record, if it was immutable. However, when a value is updated - * in a bucket, and a bucket entry already exists for the same key, instead of creating - * a new entry, we just update the value in the existing entry. + * in a bucket, and a bucket entry already exists for the same key, instead of creating a new + * entry, we just update the value in the existing entry. */ public static class BucketEntry { @@ -257,11 +257,11 @@ public BucketEntry(final ReadableSequentialData entryData) { while (entryData.hasRemaining()) { final int tag = entryData.readVarInt(false); final int fieldNum = tag >> TAG_FIELD_OFFSET; - if (fieldNum == FIELD_BUCKETENTRY_HASHCODE.number()) { + if (fieldNum == Bucket.FIELD_BUCKETENTRY_HASHCODE.number()) { hashCode = entryData.readInt(); - } else if (fieldNum == FIELD_BUCKETENTRY_VALUE.number()) { + } else if (fieldNum == Bucket.FIELD_BUCKETENTRY_VALUE.number()) { value = entryData.readLong(); - } else if (fieldNum == FIELD_BUCKETENTRY_KEYBYTES.number()) { + } else if (fieldNum == Bucket.FIELD_BUCKETENTRY_KEYBYTES.number()) { final int bytesSize = entryData.readVarInt(false); keyBytes = entryData.readBytes(bytesSize); } else { @@ -297,21 +297,22 @@ public Bytes getKeyBytes() { public int sizeInBytes() { int size = 0; - size += ProtoWriterTools.sizeOfTag(FIELD_BUCKETENTRY_HASHCODE, ProtoConstants.WIRE_TYPE_FIXED_32_BIT) + size += ProtoWriterTools.sizeOfTag(Bucket.FIELD_BUCKETENTRY_HASHCODE, ProtoConstants.WIRE_TYPE_FIXED_32_BIT) + Integer.BYTES; - size += ProtoWriterTools.sizeOfTag(FIELD_BUCKETENTRY_VALUE, ProtoConstants.WIRE_TYPE_FIXED_64_BIT) + size += ProtoWriterTools.sizeOfTag(Bucket.FIELD_BUCKETENTRY_VALUE, ProtoConstants.WIRE_TYPE_FIXED_64_BIT) + Long.BYTES; - size += ProtoWriterTools.sizeOfDelimited(FIELD_BUCKETENTRY_KEYBYTES, Math.toIntExact(keyBytes.length())); + size += ProtoWriterTools.sizeOfDelimited( + Bucket.FIELD_BUCKETENTRY_KEYBYTES, Math.toIntExact(keyBytes.length())); return size; } public void writeTo(final WritableSequentialData out) { - ProtoWriterTools.writeTag(out, FIELD_BUCKETENTRY_HASHCODE); + ProtoWriterTools.writeTag(out, Bucket.FIELD_BUCKETENTRY_HASHCODE); out.writeInt(hashCode); - ProtoWriterTools.writeTag(out, FIELD_BUCKETENTRY_VALUE); + ProtoWriterTools.writeTag(out, Bucket.FIELD_BUCKETENTRY_VALUE); out.writeLong(value); ProtoWriterTools.writeDelimited( - out, FIELD_BUCKETENTRY_KEYBYTES, Math.toIntExact(keyBytes.length()), keyBytes::writeTo); + out, Bucket.FIELD_BUCKETENTRY_KEYBYTES, Math.toIntExact(keyBytes.length()), keyBytes::writeTo); } } } diff --git a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ReusableBucketPool.java b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ReusableBucketPool.java index 01ff6d59394b..06dfc3fd41b2 100644 --- a/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ReusableBucketPool.java +++ b/platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ReusableBucketPool.java @@ -24,12 +24,10 @@ * for other threads) on a single thread, this class would be as simple as a * single {@link ThreadLocal} object. This is not the case, unfortunately. For * example, when HDHM background reading threads read buckets from disk, buckets - * are requested from the pool by {@link BucketSerializer} as a part of data - * file collection read call. Then buckets are updated and put to a queue, which - * is processed on a different thread, virtual pipeline (aka lifecycle) thread. - * Only after that buckets can be reused. This is why the pool is implemented as - * an array of buckets with fast concurrent read/write access from multiple - * threads. + * are requested from the pool. Then buckets are updated and then stored to + * disk on a different thread. After that buckets can be reused. This is why the + * pool is implemented as an array of buckets with fast concurrent read/write + * access from multiple threads. */ public class ReusableBucketPool { diff --git a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbBuilderTest.java b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbBuilderTest.java index bc226e5cee1a..5c718586a8a0 100644 --- a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbBuilderTest.java +++ b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbBuilderTest.java @@ -21,6 +21,8 @@ class MerkleDbBuilderTest { + private static final long INITIAL_SIZE = 1_000_000; + private static Path testDirectory; @BeforeAll @@ -37,15 +39,11 @@ public void afterCheckNoDbLeftOpen() { private MerkleDbTableConfig createTableConfig() { final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); return new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, INITIAL_SIZE, merkleDbConfig.hashesRamToDiskThreshold()); } - private MerkleDbTableConfig createTableConfig(final long maxNumOfKeys, final long hashesRamToDiskThreshold) { - final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); - return new MerkleDbTableConfig((short) 1, DigestType.SHA_384, maxNumOfKeys, hashesRamToDiskThreshold); + private MerkleDbTableConfig createTableConfig(final long initialCapacity, final long hashesRamToDiskThreshold) { + return new MerkleDbTableConfig((short) 1, DigestType.SHA_384, initialCapacity, hashesRamToDiskThreshold); } @Test @@ -87,7 +85,7 @@ public void testBuilderDefaults() throws IOException { final Configuration configuration = new TestConfigBuilder().getOrCreateConfig(); final MerkleDbConfig merkleDbConfig = configuration.getConfigData(MerkleDbConfig.class); assertFalse(merkleDbDataSource.isPreferDiskBasedIndexes()); - assertEquals(merkleDbConfig.maxNumOfKeys(), merkleDbDataSource.getMaxNumberOfKeys()); + assertEquals(INITIAL_SIZE, merkleDbDataSource.getInitialCapacity()); assertEquals(merkleDbConfig.hashesRamToDiskThreshold(), merkleDbDataSource.getHashesRamToDiskThreshold()); // set explicitly above assertFalse(merkleDbDataSource.isCompactionEnabled()); @@ -112,7 +110,7 @@ public void testCustomTableConfig() throws IOException { assertEquals( defaultDbPath.resolve("tables").resolve("test3-" + merkleDbDataSource.getTableId()), merkleDbDataSource.getStorageDir()); - assertEquals(1999, merkleDbDataSource.getMaxNumberOfKeys()); + assertEquals(1999, merkleDbDataSource.getInitialCapacity()); assertEquals(Integer.MAX_VALUE >> 4, merkleDbDataSource.getHashesRamToDiskThreshold()); // set explicitly above assertTrue(merkleDbDataSource.isCompactionEnabled()); diff --git a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbTableConfigTest.java b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbTableConfigTest.java index ae3850afa145..8ec6700bdc08 100644 --- a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbTableConfigTest.java +++ b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/MerkleDbTableConfigTest.java @@ -22,7 +22,7 @@ public static void setup() throws Exception { } @Test - void testIllegalMaxNumOfKeys() { + void testIllegalCapacity() { final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); Assertions.assertThrows( IllegalArgumentException.class, @@ -36,10 +36,8 @@ void testIllegalMaxNumOfKeys() { @Test void testIllegalHashesRamToDiskThreshold() { - final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); Assertions.assertThrows( - IllegalArgumentException.class, - () -> new MerkleDbTableConfig((short) 1, DigestType.SHA_384, merkleDbConfig.maxNumOfKeys(), -1)); + IllegalArgumentException.class, () -> new MerkleDbTableConfig((short) 1, DigestType.SHA_384, 1000, -1)); } @Test @@ -48,7 +46,7 @@ void deserializeDefaultsTest() throws IOException { final MerkleDbTableConfig tableConfig = new MerkleDbTableConfig( (short) 1, DigestType.SHA_384, 1_000, 0); // Default protobuf value, will not be serialized - Assertions.assertEquals(1_000, tableConfig.getMaxNumberOfKeys()); + Assertions.assertEquals(1_000, tableConfig.getInitialCapacity()); Assertions.assertEquals(0, tableConfig.getHashesRamToDiskThreshold()); final ByteArrayOutputStream bout = new ByteArrayOutputStream(); @@ -62,7 +60,7 @@ void deserializeDefaultsTest() throws IOException { restored = new MerkleDbTableConfig(in); } - Assertions.assertEquals(1_000, restored.getMaxNumberOfKeys()); + Assertions.assertEquals(1_000, restored.getInitialCapacity()); // Fields that aren't deserialized should have default protobuf values (e.g. zero), not // default MerkleDbConfig values Assertions.assertEquals(0, restored.getHashesRamToDiskThreshold()); diff --git a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/DataFileReaderCloseTest.java b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/DataFileReaderCloseTest.java index 506300bf4f5e..a2eb4b49d9f0 100644 --- a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/DataFileReaderCloseTest.java +++ b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/DataFileReaderCloseTest.java @@ -42,6 +42,7 @@ static void teardown() throws IOException { @Test void readerIsOpenTest() throws Exception { final int COUNT = 100; + collection.updateValidKeyRange(0, COUNT - 1); collection.startWriting(); final LongList index = new LongListOffHeap(COUNT / 10, COUNT, COUNT / 10); index.updateValidRange(0, COUNT - 1); @@ -57,7 +58,7 @@ void readerIsOpenTest() throws Exception { 2 * Long.BYTES)); } //noinspection resource - collection.endWriting(0, COUNT - 1); + collection.endWriting(); final AtomicBoolean readingThreadStarted = new AtomicBoolean(false); final AtomicReference exceptionOccurred = new AtomicReference<>(); final Thread readingThread = new Thread(() -> { diff --git a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMapTest.java b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMapTest.java index 155dc41fe88e..93dac91bc3a5 100644 --- a/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMapTest.java +++ b/platform-sdk/swirlds-merkledb/src/test/java/com/swirlds/merkledb/files/hashmap/HalfDiskHashMapTest.java @@ -4,8 +4,10 @@ import static com.swirlds.merkledb.test.fixtures.MerkleDbTestUtils.CONFIGURATION; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.hedera.pbj.runtime.io.buffer.BufferedData; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.swirlds.merkledb.collections.LongList; import com.swirlds.merkledb.collections.LongListHeap; @@ -18,12 +20,15 @@ import com.swirlds.virtualmap.datasource.VirtualLeafBytes; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.hiero.base.io.streams.SerializableDataInputStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; @SuppressWarnings({"SameParameterValue", "unchecked"}) class HalfDiskHashMapTest { @@ -35,28 +40,28 @@ class HalfDiskHashMapTest { // ================================================================================================================= // Helper Methods - private HalfDiskHashMap createNewTempMap(FilesTestType testType, int count) throws IOException { + private HalfDiskHashMap createNewTempMap(final String name, final long count) throws IOException { // create map HalfDiskHashMap map = new HalfDiskHashMap( - CONFIGURATION, count, tempDirPath.resolve(testType.name()), "HalfDiskHashMapTest", null, false); + CONFIGURATION, count, tempDirPath.resolve(name), "HalfDiskHashMapTest", null, false); map.printStats(); return map; } - private MemoryIndexDiskKeyValueStore createNewTempKV(FilesTestType testType, int count) throws IOException { + private MemoryIndexDiskKeyValueStore createNewTempKV(final String name, final int count) throws IOException { final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); final LongList index = new LongListHeap(count, CONFIGURATION); return new MemoryIndexDiskKeyValueStore( - merkleDbConfig, - tempDirPath.resolve(testType.name() + "_kv"), - "HalfDiskHashMapTestKV", - null, - null, - index); + merkleDbConfig, tempDirPath.resolve(name + "_kv"), "HalfDiskHashMapTestKV", null, null, index); } private static void createSomeData( - FilesTestType testType, HalfDiskHashMap map, int start, int count, long dataMultiplier) throws IOException { + final FilesTestType testType, + final HalfDiskHashMap map, + final int start, + final int count, + final long dataMultiplier) + throws IOException { map.startWriting(); for (int i = start; i < (start + count); i++) { final VirtualKey key = testType.createVirtualLongKey(i); @@ -69,7 +74,12 @@ private static void createSomeData( } private static void checkData( - FilesTestType testType, HalfDiskHashMap map, int start, int count, long dataMultiplier) throws IOException { + final FilesTestType testType, + final HalfDiskHashMap map, + final int start, + final int count, + final long dataMultiplier) + throws IOException { long START = System.currentTimeMillis(); for (int i = start; i < (start + count); i++) { final var key = testType.createVirtualLongKey(i); @@ -91,98 +101,103 @@ void createDataAndCheck(FilesTestType testType) throws Exception { final Path tempSnapshotDir = tempDirPath.resolve("DataFileTestSnapshot_" + testType.name()); final int count = 10_000; // create map - final HalfDiskHashMap map = createNewTempMap(testType, count); - // create some data - createSomeData(testType, map, 1, count, 1); - // sequentially check data - checkData(testType, map, 1, count, 1); - // randomly check data - Random random = new Random(1234); - for (int j = 1; j < (count * 2); j++) { - int i = 1 + random.nextInt(count); - final VirtualKey key = testType.createVirtualLongKey(i); - long result = map.get(testType.keySerializer.toBytes(key), key.hashCode(), 0); - assertEquals(i, result, "unexpected value of newVirtualLongKey"); + try (HalfDiskHashMap map = createNewTempMap("createDataAndCheck", count)) { + // create some data + createSomeData(testType, map, 1, count, 1); + // sequentially check data + checkData(testType, map, 1, count, 1); + // randomly check data + Random random = new Random(1234); + for (int j = 1; j < (count * 2); j++) { + int i = 1 + random.nextInt(count); + final VirtualKey key = testType.createVirtualLongKey(i); + long result = map.get(testType.keySerializer.toBytes(key), key.hashCode(), 0); + assertEquals(i, result, "unexpected value of newVirtualLongKey"); + } + // create snapshot + map.snapshot(tempSnapshotDir); + // open snapshot and check data + HalfDiskHashMap mapFromSnapshot = + new HalfDiskHashMap(CONFIGURATION, count, tempSnapshotDir, "HalfDiskHashMapTest", null, false); + mapFromSnapshot.printStats(); + checkData(testType, mapFromSnapshot, 1, count, 1); + // check deletion + map.startWriting(); + final VirtualKey key5 = testType.createVirtualLongKey(5); + final VirtualKey key50 = testType.createVirtualLongKey(50); + final VirtualKey key500 = testType.createVirtualLongKey(500); + map.delete(testType.keySerializer.toBytes(key5), key5.hashCode()); + map.delete(testType.keySerializer.toBytes(key50), key50.hashCode()); + map.delete(testType.keySerializer.toBytes(key500), key500.hashCode()); + map.endWriting(); + assertEquals(-1, map.get(testType.keySerializer.toBytes(key5), key5.hashCode(), -1), "Expect not to exist"); + assertEquals( + -1, map.get(testType.keySerializer.toBytes(key50), key50.hashCode(), -1), "Expect not to exist"); + assertEquals( + -1, map.get(testType.keySerializer.toBytes(key500), key500.hashCode(), -1), "Expect not to exist"); + checkData(testType, map, 1, 4, 1); + checkData(testType, map, 6, 43, 1); + checkData(testType, map, 51, 448, 1); + checkData(testType, map, 501, 9499, 1); + // check close and try read after + map.close(); + assertEquals( + -1, + map.get(testType.keySerializer.toBytes(key5), key5.hashCode(), -1), + "Expect not found result as just closed the map!"); } - // create snapshot - map.snapshot(tempSnapshotDir); - // open snapshot and check data - HalfDiskHashMap mapFromSnapshot = - new HalfDiskHashMap(CONFIGURATION, count, tempSnapshotDir, "HalfDiskHashMapTest", null, false); - mapFromSnapshot.printStats(); - checkData(testType, mapFromSnapshot, 1, count, 1); - // check deletion - map.startWriting(); - final VirtualKey key5 = testType.createVirtualLongKey(5); - final VirtualKey key50 = testType.createVirtualLongKey(50); - final VirtualKey key500 = testType.createVirtualLongKey(500); - map.delete(testType.keySerializer.toBytes(key5), key5.hashCode()); - map.delete(testType.keySerializer.toBytes(key50), key50.hashCode()); - map.delete(testType.keySerializer.toBytes(key500), key500.hashCode()); - map.endWriting(); - assertEquals(-1, map.get(testType.keySerializer.toBytes(key5), key5.hashCode(), -1), "Expect not to exist"); - assertEquals(-1, map.get(testType.keySerializer.toBytes(key50), key50.hashCode(), -1), "Expect not to exist"); - assertEquals(-1, map.get(testType.keySerializer.toBytes(key500), key500.hashCode(), -1), "Expect not to exist"); - checkData(testType, map, 1, 4, 1); - checkData(testType, map, 6, 43, 1); - checkData(testType, map, 51, 448, 1); - checkData(testType, map, 501, 9499, 1); - // check close and try read after - map.close(); - assertEquals( - -1, - map.get(testType.keySerializer.toBytes(key5), key5.hashCode(), -1), - "Expect not found result as just closed the map!"); } @ParameterizedTest @EnumSource(FilesTestType.class) void multipleWriteBatchesAndMerge(FilesTestType testType) throws Exception { // create map - final HalfDiskHashMap map = createNewTempMap(testType, 10_000); - final DataFileCompactor dataFileCompactor = new DataFileCompactor( - CONFIGURATION.getConfigData(MerkleDbConfig.class), - "HalfDiskHashMapTest", - map.getFileCollection(), - map.getBucketIndexToBucketLocation(), - null, - null, - null, - null); - // create some data - createSomeData(testType, map, 1, 1111, 1); - checkData(testType, map, 1, 1111, 1); - // create some more data - createSomeData(testType, map, 1111, 3333, 1); - checkData(testType, map, 1, 3333, 1); - // create some more data - createSomeData(testType, map, 1111, 10_000, 1); - checkData(testType, map, 1, 10_000, 1); - // do a merge - dataFileCompactor.compact(); - // check all data after - checkData(testType, map, 1, 10_000, 1); + try (HalfDiskHashMap map = createNewTempMap("multipleWriteBatchesAndMerge", 10_000)) { + final DataFileCompactor dataFileCompactor = new DataFileCompactor( + CONFIGURATION.getConfigData(MerkleDbConfig.class), + "HalfDiskHashMapTest", + map.getFileCollection(), + map.getBucketIndexToBucketLocation(), + null, + null, + null, + null); + // create some data + createSomeData(testType, map, 1, 1111, 1); + checkData(testType, map, 1, 1111, 1); + // create some more data + createSomeData(testType, map, 1111, 3333, 1); + checkData(testType, map, 1, 3333, 1); + // create some more data + createSomeData(testType, map, 1111, 10_000, 1); + checkData(testType, map, 1, 10_000, 1); + // do a merge + dataFileCompactor.compact(); + // check all data after + checkData(testType, map, 1, 10_000, 1); + } } @ParameterizedTest @EnumSource(FilesTestType.class) void updateData(FilesTestType testType) throws Exception { // create map - final HalfDiskHashMap map = createNewTempMap(testType, 1000); - // create some data - createSomeData(testType, map, 0, 1000, 1); - checkData(testType, map, 0, 1000, 1); - // update some data - createSomeData(testType, map, 200, 400, 2); - checkData(testType, map, 0, 200, 1); - checkData(testType, map, 200, 400, 2); - checkData(testType, map, 600, 400, 1); + try (HalfDiskHashMap map = createNewTempMap("updateData", 1000)) { + // create some data + createSomeData(testType, map, 0, 1000, 1); + checkData(testType, map, 0, 1000, 1); + // update some data + createSomeData(testType, map, 200, 400, 2); + checkData(testType, map, 0, 200, 1); + checkData(testType, map, 200, 400, 2); + checkData(testType, map, 600, 400, 1); + } } @Test void testOverwritesWithCollision() throws IOException { final FilesTestType testType = FilesTestType.fixed; - try (final HalfDiskHashMap map = createNewTempMap(testType, 1000)) { + try (HalfDiskHashMap map = createNewTempMap("testOverwritesWithCollision", 1000)) { map.startWriting(); for (int i = 100; i < 300; i++) { final VirtualKey key = new CollidableFixedLongKey(i); @@ -195,84 +210,368 @@ void testOverwritesWithCollision() throws IOException { @Test void testRebuildMap() throws Exception { final FilesTestType testType = FilesTestType.variable; - final HalfDiskHashMap map = createNewTempMap(testType, 100); + try (HalfDiskHashMap map = createNewTempMap("testRebuildMap", 100)) { + map.startWriting(); + final VirtualKey key1 = testType.createVirtualLongKey(1); + map.put(testType.keySerializer.toBytes(key1), key1.hashCode(), 1); + final VirtualKey key2 = testType.createVirtualLongKey(2); + map.put(testType.keySerializer.toBytes(key2), key2.hashCode(), 2); + map.endWriting(); + map.startWriting(); + final VirtualKey key3 = testType.createVirtualLongKey(3); + map.put(testType.keySerializer.toBytes(key3), key3.hashCode(), 3); + final VirtualKey key4 = testType.createVirtualLongKey(4); + map.put(testType.keySerializer.toBytes(key4), key4.hashCode(), 4); + map.endWriting(); - map.startWriting(); - final VirtualKey key1 = testType.createVirtualLongKey(1); - map.put(testType.keySerializer.toBytes(key1), key1.hashCode(), 1); - final VirtualKey key2 = testType.createVirtualLongKey(2); - map.put(testType.keySerializer.toBytes(key2), key2.hashCode(), 2); - map.endWriting(); - map.startWriting(); - final VirtualKey key3 = testType.createVirtualLongKey(3); - map.put(testType.keySerializer.toBytes(key3), key3.hashCode(), 3); - final VirtualKey key4 = testType.createVirtualLongKey(4); - map.put(testType.keySerializer.toBytes(key4), key4.hashCode(), 4); - map.endWriting(); + assertEquals(1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); + assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); + assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); + assertEquals(4, map.get(testType.keySerializer.toBytes(key4), key4.hashCode(), -1)); + + final MemoryIndexDiskKeyValueStore kv = createNewTempKV("testRebuildMap", 100); + kv.startWriting(); + kv.updateValidKeyRange(2, 4); + final VirtualLeafBytes rec2 = + new VirtualLeafBytes(2, testType.keySerializer.toBytes(key2), key2.hashCode(), Bytes.wrap("12")); + kv.put(2, rec2::writeTo, rec2.getSizeInBytes()); + final VirtualLeafBytes rec3 = + new VirtualLeafBytes(3, testType.keySerializer.toBytes(key3), key3.hashCode(), Bytes.wrap("13")); + kv.put(3, rec3::writeTo, rec3.getSizeInBytes()); + final VirtualLeafBytes rec4 = + new VirtualLeafBytes(4, testType.keySerializer.toBytes(key4), key4.hashCode(), Bytes.wrap("14")); + kv.put(4, rec4::writeTo, rec4.getSizeInBytes()); + kv.endWriting(); + + map.repair(2, 4, kv); - assertEquals(1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); - assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); - assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); - assertEquals(4, map.get(testType.keySerializer.toBytes(key4), key4.hashCode(), -1)); - - final MemoryIndexDiskKeyValueStore kv = createNewTempKV(testType, 100); - kv.startWriting(); - kv.updateValidKeyRange(2, 4); - final VirtualLeafBytes rec2 = - new VirtualLeafBytes(2, testType.keySerializer.toBytes(key2), key2.hashCode(), Bytes.wrap("12")); - kv.put(2, rec2::writeTo, rec2.getSizeInBytes()); - final VirtualLeafBytes rec3 = - new VirtualLeafBytes(3, testType.keySerializer.toBytes(key3), key3.hashCode(), Bytes.wrap("13")); - kv.put(3, rec3::writeTo, rec3.getSizeInBytes()); - final VirtualLeafBytes rec4 = - new VirtualLeafBytes(4, testType.keySerializer.toBytes(key4), key4.hashCode(), Bytes.wrap("14")); - kv.put(4, rec4::writeTo, rec4.getSizeInBytes()); - kv.endWriting(); - - map.repair(2, 4, kv); - - assertEquals(-1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); - assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); - assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); - assertEquals(4, map.get(testType.keySerializer.toBytes(key4), key4.hashCode(), -1)); + assertEquals(-1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); + assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); + assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); + assertEquals(4, map.get(testType.keySerializer.toBytes(key4), key4.hashCode(), -1)); + } } @Test void testRebuildIncompleteMap() throws Exception { final FilesTestType testType = FilesTestType.variable; - final HalfDiskHashMap map = createNewTempMap(testType, 100); + try (HalfDiskHashMap map = createNewTempMap("testRebuildIncompleteMap", 100)) { + map.startWriting(); + final VirtualKey key1 = testType.createVirtualLongKey(1); + map.put(testType.keySerializer.toBytes(key1), key1.hashCode(), 1); + final VirtualKey key2 = testType.createVirtualLongKey(2); + map.put(testType.keySerializer.toBytes(key2), key2.hashCode(), 2); + final VirtualKey key3 = testType.createVirtualLongKey(3); + map.put(testType.keySerializer.toBytes(key3), key3.hashCode(), 3); + final VirtualKey key4 = testType.createVirtualLongKey(4); + // No entry for key 4 + map.endWriting(); - map.startWriting(); - final VirtualKey key1 = testType.createVirtualLongKey(1); - map.put(testType.keySerializer.toBytes(key1), key1.hashCode(), 1); - final VirtualKey key2 = testType.createVirtualLongKey(2); - map.put(testType.keySerializer.toBytes(key2), key2.hashCode(), 2); - final VirtualKey key3 = testType.createVirtualLongKey(3); - map.put(testType.keySerializer.toBytes(key3), key3.hashCode(), 3); - final VirtualKey key4 = testType.createVirtualLongKey(4); - // No entry for key 4 - map.endWriting(); + assertEquals(1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); + assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); + assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); + + final MemoryIndexDiskKeyValueStore kv = createNewTempKV("testRebuildIncompleteMap", 100); + kv.startWriting(); + kv.updateValidKeyRange(2, 4); + final VirtualLeafBytes rec2 = + new VirtualLeafBytes(2, testType.keySerializer.toBytes(key2), key2.hashCode(), Bytes.wrap("12")); + kv.put(2, rec2::writeTo, rec2.getSizeInBytes()); + final VirtualLeafBytes rec3 = + new VirtualLeafBytes(3, testType.keySerializer.toBytes(key3), key3.hashCode(), Bytes.wrap("13")); + kv.put(3, rec3::writeTo, rec3.getSizeInBytes()); + final VirtualLeafBytes rec4 = + new VirtualLeafBytes(4, testType.keySerializer.toBytes(key4), key4.hashCode(), Bytes.wrap("14")); + kv.put(4, rec4::writeTo, rec4.getSizeInBytes()); + kv.endWriting(); + + // key4 is missing in the map, it cannot be restored from pathToKeyValue store + assertThrows(IOException.class, () -> map.repair(2, 4, kv)); + } + } + + @ParameterizedTest + @ValueSource(longs = {100, 1000, 2000, 1_000_000, 1_000_000_000}) + void testDefaultNumOfBuckets(final long count) throws Exception { + try (HalfDiskHashMap map = createNewTempMap("testDefaultNumOfBuckets", count)) { + assertEquals(calcExpectedNumOfBuckets(count), map.getNumOfBuckets()); + } + } + + @Test + void testResizeBasic() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("testResizeBasic", 1000)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(1000); + assertEquals(initialNumOfBuckets, map.getNumOfBuckets()); + map.resizeIfNeeded(99, 198); // map size: 100, no resize needed + assertEquals(initialNumOfBuckets, map.getNumOfBuckets()); + map.resizeIfNeeded(999, 1998); // map size: 1000, buckets should be doubled + assertEquals(initialNumOfBuckets * 2, map.getNumOfBuckets()); + } + } + + @Test + void checkValuesAfterResize() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("checkValuesAfterResize", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + assertEquals(initialNumOfBuckets, map.getNumOfBuckets()); + map.startWriting(); + for (int i = 0; i < 100; i++) { + map.put(Bytes.wrap(new byte[] {(byte) i, 10}), 1000 + i, i * 2); + } + map.endWriting(); + map.resizeIfNeeded(499, 998); + for (int i = 0; i < 100; i++) { + final long path = map.get(Bytes.wrap(new byte[] {(byte) i, 10}), 1000 + i, -1); + assertEquals(i * 2, path); + } + for (int i = 100; i < 200; i++) { + final long path = map.get(Bytes.wrap(new byte[] {(byte) i, 10}), 1000 + i, -1); + assertEquals(-1, path); + } + } + } + + @Test + void checkBucketIndexAfterResize() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("checkBucketIndexAfterResize", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + assertEquals(initialNumOfBuckets, map.getNumOfBuckets()); // should be 8 + map.startWriting(); + for (int i = 0; i < 100; i++) { + map.put(Bytes.wrap(new byte[] {(byte) i, 10}), 1000 + i, i * 2); + } + map.endWriting(); + final List bucketIndexValues = new ArrayList<>(); + for (int i = 0; i < initialNumOfBuckets; i++) { + bucketIndexValues.add(map.getBucketIndexToBucketLocation().get(i)); + } + assertEquals(initialNumOfBuckets, bucketIndexValues.size()); + map.resizeIfNeeded(499, 998); + assertEquals(initialNumOfBuckets * 2, map.getNumOfBuckets()); + for (int i = 0; i < initialNumOfBuckets; i++) { + assertEquals( + bucketIndexValues.get(i), + map.getBucketIndexToBucketLocation().get(i)); + assertEquals( + bucketIndexValues.get(i), + map.getBucketIndexToBucketLocation().get(initialNumOfBuckets + i)); + } + } + } + + @Test + void getAfterResize() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("getAfterResize", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + map.startWriting(); + for (int i = 0; i < 100; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + map.put(Bytes.wrap(new byte[] {(byte) i, (byte) i}), i, i * 2); + map.put(Bytes.wrap(new byte[] {(byte) (i + 100), 11}), i + initialNumOfBuckets, i + 2); + } + map.endWriting(); + map.resizeIfNeeded(499, 998); + assertEquals(initialNumOfBuckets * 2, map.getNumOfBuckets()); + for (int i = 0; i < 100; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + assertEquals(i * 2, map.get(Bytes.wrap(new byte[] {(byte) i, (byte) i}), i, -1)); + assertEquals(-1, map.get(Bytes.wrap(new byte[] {(byte) i, -1}), i, -1)); + assertEquals( + i + 2, map.get(Bytes.wrap(new byte[] {(byte) (i + 100), 11}), i + initialNumOfBuckets, -1)); + } + } + } + + @Test + void checkBucketsAfterPut() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("checkBucketsAfterPut", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + map.startWriting(); + for (int i = 0; i < initialNumOfBuckets; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + map.put(Bytes.wrap(new byte[] {(byte) i, (byte) i}), i, i * 2L); + map.put(Bytes.wrap(new byte[] {(byte) (i + 100), 11}), i + initialNumOfBuckets, i + 2L); + } + map.endWriting(); + final LongList bucketIndex = (LongList) map.getBucketIndexToBucketLocation(); + for (int i = 0; i < initialNumOfBuckets; i++) { + final BufferedData bucketData = map.getFileCollection().readDataItemUsingIndex(bucketIndex, i); + assertNotNull(bucketData); + try (ParsedBucket bucket = new ParsedBucket()) { + bucket.readFrom(bucketData); + assertEquals(2, bucket.getBucketEntryCount()); + assertEquals(i * 2L, bucket.findValue(i, Bytes.wrap(new byte[] {(byte) i, (byte) i}), -1)); + assertEquals(-1, bucket.findValue(i, Bytes.wrap(new byte[] {(byte) i, -1}), -1)); + assertEquals( + i + 2L, + bucket.findValue( + i + initialNumOfBuckets, Bytes.wrap(new byte[] {(byte) (i + 100), 11}), -1)); + assertEquals( + -1, bucket.findValue(i + initialNumOfBuckets, Bytes.wrap(new byte[] {(byte) i, 11}), -1)); + } + } + } + } + + @Test + void checkBucketsAfterResize() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("checkBucketsAfterResize", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + map.startWriting(); + for (int i = 0; i < initialNumOfBuckets; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + map.put(Bytes.wrap(new byte[] {(byte) i}), i, i * 3L); + map.put(Bytes.wrap(new byte[] {(byte) (i + 100)}), i + initialNumOfBuckets, i + 3L); + } + map.endWriting(); + map.resizeIfNeeded(499, 998); + final LongList bucketIndex = (LongList) map.getBucketIndexToBucketLocation(); + for (int i = 0; i < initialNumOfBuckets; i++) { + final BufferedData bucketData = map.getFileCollection().readDataItemUsingIndex(bucketIndex, i); + assertNotNull(bucketData); + try (ParsedBucket bucket = new ParsedBucket()) { + bucket.readFrom(bucketData); + assertEquals(i, bucket.getBucketIndex()); + // Both i and i+initialNumOfBuckets entries are still there + assertEquals(2, bucket.getBucketEntryCount()); + assertEquals(i * 3L, bucket.findValue(i, Bytes.wrap(new byte[] {(byte) i}), -1)); + } + } + for (int i = initialNumOfBuckets; i < initialNumOfBuckets * 2; i++) { + // The old index (before resize) + final int ei = i - initialNumOfBuckets; + final BufferedData bucketData = map.getFileCollection().readDataItemUsingIndex(bucketIndex, i); + assertNotNull(bucketData); + try (ParsedBucket bucket = new ParsedBucket()) { + bucket.readFrom(bucketData); + // The bucket still has the old index + assertEquals(ei, bucket.getBucketIndex()); + // Both i and i+initialNumOfBuckets entries are still there + assertEquals(2, bucket.getBucketEntryCount()); + assertEquals( + ei + 3L, + bucket.findValue(ei + initialNumOfBuckets, Bytes.wrap(new byte[] {(byte) (ei + 100)}), -1)); + } + } + } + } + + @Test + void checkBucketsAfterResizeAndUpdate() throws Exception { + try (HalfDiskHashMap map = createNewTempMap("checkBucketsAfterResizeAndUpdate", 200)) { + final int initialNumOfBuckets = calcExpectedNumOfBuckets(200); + map.startWriting(); + for (int i = 0; i < initialNumOfBuckets; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + map.put(Bytes.wrap(new byte[] {(byte) i}), i, i * 3L); + map.put(Bytes.wrap(new byte[] {(byte) (i + 100)}), i + initialNumOfBuckets, i + 3L); + } + map.endWriting(); + map.resizeIfNeeded(499, 998); + // Update all values, now they will be put to different buckets, and old buckets must be sanitized + map.startWriting(); + for (int i = 0; i < initialNumOfBuckets; i++) { + // These two entries should end up in the same bucket, but they will be in + // different buckets after resize + map.put(Bytes.wrap(new byte[] {(byte) i}), i, i * 4L); + map.put(Bytes.wrap(new byte[] {(byte) (i + 100)}), i + initialNumOfBuckets, i + 4L); + } + map.endWriting(); + final LongList bucketIndex = (LongList) map.getBucketIndexToBucketLocation(); + for (int i = 0; i < initialNumOfBuckets; i++) { + final BufferedData bucketData = map.getFileCollection().readDataItemUsingIndex(bucketIndex, i); + assertNotNull(bucketData); + try (ParsedBucket bucket = new ParsedBucket()) { + bucket.readFrom(bucketData); + assertEquals(i, bucket.getBucketIndex()); + assertEquals(1, bucket.getBucketEntryCount()); + assertEquals(i * 4L, bucket.findValue(i, Bytes.wrap(new byte[] {(byte) i}), -1)); + assertEquals( + -1, + bucket.findValue(i + initialNumOfBuckets, Bytes.wrap(new byte[] {(byte) (i + 100)}), -1)); + } + } + for (int i = initialNumOfBuckets; i < initialNumOfBuckets * 2; i++) { + // The old index (before resize) + final int ei = i - initialNumOfBuckets; + final BufferedData bucketData = map.getFileCollection().readDataItemUsingIndex(bucketIndex, i); + assertNotNull(bucketData); + try (ParsedBucket bucket = new ParsedBucket()) { + bucket.readFrom(bucketData); + // The bucket now should have the new index + assertEquals(i, bucket.getBucketIndex()); + assertEquals(1, bucket.getBucketEntryCount()); + assertEquals(-1, bucket.findValue(ei, Bytes.wrap(new byte[] {(byte) ei}), -1)); + assertEquals( + ei + 4L, + bucket.findValue(ei + initialNumOfBuckets, Bytes.wrap(new byte[] {(byte) (ei + 100)}), -1)); + } + } + } + } + + @Test + void repairAfterResize() throws Exception { + // Map size is N * 2 + final int N = 250; + try (HalfDiskHashMap map = createNewTempMap("repairAfterResize", N); + final MemoryIndexDiskKeyValueStore kvStore = createNewTempKV("repairAfterResize", N * 4)) { + final int first = N * 2 - 1; + final int last = N * 4 - 2; + kvStore.updateValidKeyRange(first, last); + final int initialNumOfBuckets = calcExpectedNumOfBuckets(N); + map.startWriting(); + kvStore.startWriting(); + // Add N * 2 entities. Entities in each pair have hash codes with only one (high) bit different + for (int i = 0; i < N; i++) { + // KV 1 + final Bytes key1 = Bytes.wrap(intToByteArray(i + 1000)); + final int hash1 = i; + final long path1 = first + i; + map.put(key1, hash1, path1); + final VirtualLeafBytes rec1 = new VirtualLeafBytes(path1, key1, hash1, Bytes.wrap("" + i)); + kvStore.put(path1, rec1::writeTo, rec1.getSizeInBytes()); + // KV 2 + final Bytes key2 = Bytes.wrap(intToByteArray(i + 1001)); + final int hash2 = i + initialNumOfBuckets; + final long path2 = first + N + i; + map.put(key2, hash2, path2); + final VirtualLeafBytes rec2 = new VirtualLeafBytes(path2, key2, hash2, Bytes.wrap("" + i)); + kvStore.put(path2, rec2::writeTo, rec2.getSizeInBytes()); + } + map.endWriting(); + kvStore.endWriting(); + map.resizeIfNeeded(first, last); + map.repair(first, last, kvStore); + // Check that all data is still in the map + for (int i = 0; i < initialNumOfBuckets; i++) { + final Bytes key1 = Bytes.wrap(intToByteArray(i + 1000)); + final int hash1 = i; + final long path1 = first + i; + assertEquals(path1, map.get(key1, hash1, -1)); + final Bytes key2 = Bytes.wrap(intToByteArray(i + 1001)); + final int hash2 = i + initialNumOfBuckets; + final long path2 = first + N + i; + assertEquals(path2, map.get(key2, hash2, -1)); + } + } + } + + private int calcExpectedNumOfBuckets(final long mapSizeHint) { + int goodAverageBucketEntryCount = + CONFIGURATION.getConfigData(MerkleDbConfig.class).goodAverageBucketEntryCount(); + return Integer.highestOneBit(Math.toIntExact(mapSizeHint / goodAverageBucketEntryCount)) * 2; + } - assertEquals(1, map.get(testType.keySerializer.toBytes(key1), key1.hashCode(), -1)); - assertEquals(2, map.get(testType.keySerializer.toBytes(key2), key2.hashCode(), -1)); - assertEquals(3, map.get(testType.keySerializer.toBytes(key3), key3.hashCode(), -1)); - - final MemoryIndexDiskKeyValueStore kv = createNewTempKV(testType, 100); - kv.startWriting(); - kv.updateValidKeyRange(2, 4); - final VirtualLeafBytes rec2 = - new VirtualLeafBytes(2, testType.keySerializer.toBytes(key2), key2.hashCode(), Bytes.wrap("12")); - kv.put(2, rec2::writeTo, rec2.getSizeInBytes()); - final VirtualLeafBytes rec3 = - new VirtualLeafBytes(3, testType.keySerializer.toBytes(key3), key3.hashCode(), Bytes.wrap("13")); - kv.put(3, rec3::writeTo, rec3.getSizeInBytes()); - final VirtualLeafBytes rec4 = - new VirtualLeafBytes(4, testType.keySerializer.toBytes(key4), key4.hashCode(), Bytes.wrap("14")); - kv.put(4, rec4::writeTo, rec4.getSizeInBytes()); - kv.endWriting(); - - // key4 is missing in the map, it cannot be restored from pathToKeyValue store - assertThrows(IOException.class, () -> map.repair(2, 4, kv)); + private byte[] intToByteArray(final int i) { + return new byte[] {(byte) (i & 255), (byte) ((i >> 8) & 255), (byte) ((i >> 16) & 255), (byte) ((i >> 24) & 255) + }; } private static void printTestUpdate(long start, long count, String msg) { diff --git a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbSnapshotTest.java b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbSnapshotTest.java index 2cffaaaaee13..8c226fe1c8a9 100644 --- a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbSnapshotTest.java +++ b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbSnapshotTest.java @@ -102,10 +102,7 @@ public void afterTest() { private static MerkleDbTableConfig fixedConfig() { final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); return new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, 1_000_000, merkleDbConfig.hashesRamToDiskThreshold()); } private void verify(final MerkleInternal stateRoot) { diff --git a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbTest.java b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbTest.java index 05d6980e3815..f1d9d98a00b9 100644 --- a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbTest.java +++ b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/MerkleDbTest.java @@ -46,10 +46,7 @@ public void shutdownTest() { private static MerkleDbTableConfig fixedConfig() { final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); return new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, 1_000_000, merkleDbConfig.hashesRamToDiskThreshold()); } @Test diff --git a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionCompactionTest.java b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionCompactionTest.java index f866c0dab828..c9369237d801 100644 --- a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionCompactionTest.java +++ b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionCompactionTest.java @@ -82,25 +82,29 @@ void testMerge() throws Exception { coll.startWriting(); index.put(1L, storeDataItem(coll, new long[] {1, APPLE})); index.put(2L, storeDataItem(coll, new long[] {2, BANANA})); - coll.endWriting(1, 2); + coll.updateValidKeyRange(1, 2); + coll.endWriting(); coll.startWriting(); index.put(3L, storeDataItem(coll, new long[] {3, APPLE})); index.put(4L, storeDataItem(coll, new long[] {4, CHERRY})); - coll.endWriting(2, 4); + coll.updateValidKeyRange(2, 4); + coll.endWriting(); coll.startWriting(); index.put(4L, storeDataItem(coll, new long[] {4, CUTTLEFISH})); index.put(5L, storeDataItem(coll, new long[] {5, BANANA})); index.put(6L, storeDataItem(coll, new long[] {6, DATE})); - coll.endWriting(3, 6); + coll.updateValidKeyRange(3, 6); + coll.endWriting(); coll.startWriting(); index.put(7L, storeDataItem(coll, new long[] {7, APPLE})); index.put(8L, storeDataItem(coll, new long[] {8, EGGPLANT})); index.put(9L, storeDataItem(coll, new long[] {9, CUTTLEFISH})); index.put(10L, storeDataItem(coll, new long[] {10, FIG})); - coll.endWriting(5, 10); + coll.updateValidKeyRange(5, 10); + coll.endWriting(); final CASableLongIndex indexUpdater = new CASableLongIndex() { public long get(long key) { @@ -182,7 +186,8 @@ void testDoubleMerge() throws Exception { for (int j = 0; j < MAXKEYS; ++j) { index[j] = storeDataItem(store, new long[] {j, i * j}); } - store.endWriting(0, index.length); + store.updateValidKeyRange(0, index.length); + store.endWriting(); } final CountDownLatch compactionAboutComplete = new CountDownLatch(1); @@ -350,7 +355,8 @@ public boolean forEach(final LongAction action, Boolean // Finish writing the current copy, which has newer data but an older index than // the merged file - store.endWriting(0, index.length()); + store.updateValidKeyRange(0, index.length()); + store.endWriting(); } // Validate the result @@ -380,7 +386,8 @@ void testRestore() throws Exception { for (int j = 0; j < MAX_KEYS; ++j) { index.set(j, storeDataItem(store, new long[] {j, j})); } - store.endWriting(0, index.length()); + store.updateValidKeyRange(0, index.length()); + store.endWriting(); // Write new copies for (long i = 1; i < NUM_UPDATES; i++) { @@ -424,7 +431,8 @@ public boolean forEach(final LongAction action, Boolean // Finish writing the current copy, which has newer data but an older index than // the merged file - store.endWriting(0, index.length()); + store.updateValidKeyRange(0, index.length()); + store.endWriting(); } // Restore from all files @@ -472,7 +480,8 @@ void testMergeUpdateSnapshotRestore(final int testParam) throws Throwable { final long dataLocation = storeDataItem(store, new long[] {i * numValues + j, i * numValues + j}); index.put(i * numValues + j, dataLocation); } - store.endWriting(0, index.size()); + store.updateValidKeyRange(0, index.size()); + store.endWriting(); } // Start compaction // Test scenario 0: start merging with mergingPaused semaphore locked, so merging @@ -525,7 +534,8 @@ void testMergeUpdateSnapshotRestore(final int testParam) throws Throwable { index.put(i * numValues + j, dataLocation); } } - store.endWriting(0, index.size()); + store.updateValidKeyRange(0, index.size()); + store.endWriting(); newFileWriteCompleteLatch.countDown(); // Test scenario 2: lock the semaphore just before taking a snapshot. Compaction may still // be @@ -591,7 +601,8 @@ void testInconsistentIndex() throws Exception { for (int j = 0; j < MAXKEYS; ++j) { index.put(j, storeDataItem(store, new long[] {j, i * j})); } - store.endWriting(0, index.size()); + store.updateValidKeyRange(0, index.size()); + store.endWriting(); } final Path snapshot = testDir.resolve("snapshot"); diff --git a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionTest.java b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionTest.java index 02e934effaea..f006dedb60dc 100644 --- a/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionTest.java +++ b/platform-sdk/swirlds-merkledb/src/timingSensitive/java/com/swirlds/merkledb/files/DataFileCollectionTest.java @@ -153,7 +153,8 @@ void createDataFileCollection(FilesTestType testType) throws Exception { // store in file storedOffsets.put(i, storeDataItem(fileCollection, dataValue)); } - final DataFileReader newFile = fileCollection.endWriting(0, count + 100); + fileCollection.updateValidKeyRange(0, count + 100); + final DataFileReader newFile = fileCollection.endWriting(); assertEquals(new KeyRange(0, count + 100), fileCollection.getValidKeyRange(), "Range should be this"); assertEquals(Files.size(newFile.getPath()), newFile.getSize()); count += 100; @@ -457,6 +458,7 @@ void check1000AfterMerge(final FilesTestType testType) { @EnumSource(FilesTestType.class) void changeSomeData(final FilesTestType testType) throws Exception { final DataFileCollection fileCollection = fileCollectionMap.get(testType); + fileCollection.updateValidKeyRange(0, 1000); final LongListHeap storedOffsets = storedOffsetsMap.get(testType); fileCollection.startWriting(); // put in 1000 items @@ -474,7 +476,7 @@ void changeSomeData(final FilesTestType testType) throws Exception { // store in file storedOffsets.put(i, storeDataItem(fileCollection, dataValue)); } - fileCollection.endWriting(0, 1000); + fileCollection.endWriting(); // check we now have 2 files try (Stream list = Files.list(tempFileDir.resolve(testType.name()))) { assertEquals( @@ -712,7 +714,8 @@ private static void populateDataFileCollection( // store in file storedOffsets.put(i, storeDataItem(fileCollection, dataValue)); } - fileCollection.endWriting(0, count + 100); + fileCollection.updateValidKeyRange(0, count + 100); + fileCollection.endWriting(); count += 100; } } diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/reconnect/RandomVirtualMapReconnectTests.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/reconnect/RandomVirtualMapReconnectTests.java index 7467d55fa5bb..47342ad241ee 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/reconnect/RandomVirtualMapReconnectTests.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/reconnect/RandomVirtualMapReconnectTests.java @@ -67,10 +67,7 @@ protected VirtualDataSourceBuilder createBuilder() throws IOException { MerkleDb.setDefaultPath(defaultVirtualMapPath); final MerkleDbConfig merkleDbConfig = CONFIGURATION.getConfigData(MerkleDbConfig.class); final MerkleDbTableConfig tableConfig = new MerkleDbTableConfig( - (short) 1, - DigestType.SHA_384, - merkleDbConfig.maxNumOfKeys(), - merkleDbConfig.hashesRamToDiskThreshold()); + (short) 1, DigestType.SHA_384, 1_000_000, merkleDbConfig.hashesRamToDiskThreshold()); return new MerkleDbDataSourceBuilder(tableConfig, CONFIGURATION); }