@@ -1617,7 +1617,7 @@ LedgerManagerImpl::collectEntries(AppConnector& app, AbstractLedgerTxn& ltx,
1617
1617
return entryMap;
1618
1618
}
1619
1619
1620
- void
1620
+ RestoredKeys
1621
1621
LedgerManagerImpl::applyThread (AppConnector& app, ThreadEntryMap& entryMap,
1622
1622
Thread const & thread, Config const & config,
1623
1623
SorobanNetworkConfig const & sorobanConfig,
@@ -1629,6 +1629,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1629
1629
// we accumulate the RO extensions until we need to apply them.
1630
1630
UnorderedMap<LedgerKey, uint32_t > readOnlyTtlExtensions;
1631
1631
1632
+ RestoredKeys threadRestoredKeys;
1632
1633
for (auto const & txBundle : thread)
1633
1634
{
1634
1635
releaseAssertOrThrow (txBundle.getResPayload ());
@@ -1676,7 +1677,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1676
1677
txBundle.getResPayload (), sorobanMetrics, txSubSeed,
1677
1678
txBundle.getEffects ());
1678
1679
1679
- if (res.mSuccess )
1680
+ if (res.getSuccess () )
1680
1681
{
1681
1682
UnorderedMap<LedgerKey, bool > isReadOnlyTTLMap;
1682
1683
for (auto const & ro :
@@ -1699,7 +1700,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1699
1700
}
1700
1701
1701
1702
// now apply the entry changes to entryMap
1702
- for (auto const & entry : res.mModifiedEntryMap )
1703
+ for (auto const & entry : res.getModifiedEntryMap () )
1703
1704
{
1704
1705
auto const & lk = entry.first ;
1705
1706
auto const & updatedLe = entry.second ;
@@ -1738,6 +1739,18 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1738
1739
it->second = {updatedLe, true };
1739
1740
}
1740
1741
}
1742
+
1743
+ for (auto const & key : res.getRestoredKeys ().hotArchive )
1744
+ {
1745
+ auto [_, inserted] = threadRestoredKeys.hotArchive .emplace (key);
1746
+ releaseAssertOrThrow (inserted);
1747
+ }
1748
+ for (auto const & key : res.getRestoredKeys ().liveBucketList )
1749
+ {
1750
+ auto [_, inserted] =
1751
+ threadRestoredKeys.liveBucketList .emplace (key);
1752
+ releaseAssertOrThrow (inserted);
1753
+ }
1741
1754
}
1742
1755
else
1743
1756
{
@@ -1761,6 +1774,8 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1761
1774
it->second .isDirty = true ;
1762
1775
}
1763
1776
}
1777
+
1778
+ return threadRestoredKeys;
1764
1779
}
1765
1780
1766
1781
ParallelLedgerInfo
@@ -1798,6 +1813,8 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1798
1813
entryMapsByThread.emplace_back (collectEntries (app, ltx, thread));
1799
1814
}
1800
1815
1816
+ std::vector<std::future<RestoredKeys>> threadRestoredKeyFutures;
1817
+
1801
1818
auto ledgerInfo = getParallelLedgerInfo (app, ltx);
1802
1819
std::vector<std::thread> threads;
1803
1820
for (size_t i = 0 ; i < stage.size (); ++i)
@@ -1806,19 +1823,44 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1806
1823
1807
1824
auto const & thread = stage.at (i);
1808
1825
1809
- threads .emplace_back (&LedgerManagerImpl::applyThread, this ,
1810
- std::ref (app), std::ref (entryMapByThread) ,
1811
- std::ref (thread), config, sorobanConfig ,
1812
- std::ref (ledgerInfo), sorobanBasePrngSeed,
1813
- std::ref (sorobanMetrics));
1826
+ threadRestoredKeyFutures .emplace_back (std::async (
1827
+ std::launch::async, &LedgerManagerImpl::applyThread, this ,
1828
+ std::ref (app), std::ref (entryMapByThread), std::ref (thread), config,
1829
+ sorobanConfig, std::ref (ledgerInfo), sorobanBasePrngSeed,
1830
+ std::ref (sorobanMetrics) ));
1814
1831
}
1815
1832
1816
1833
for (auto & thread : threads)
1817
1834
{
1818
1835
thread.join ();
1819
1836
}
1820
1837
1838
+ std::vector<RestoredKeys> threadRestoredKeys;
1839
+ for (auto & restoredKeysFutures : threadRestoredKeyFutures)
1840
+ {
1841
+ threadRestoredKeys.emplace_back (restoredKeysFutures.get ());
1842
+ }
1843
+
1821
1844
LedgerTxn ltxInner (ltx);
1845
+ for (auto const & restoredKeys : threadRestoredKeys)
1846
+ {
1847
+ // We filter out the TTL keys here because LedgerTxn will add the ttl
1848
+ // key
1849
+ for (auto const & key : restoredKeys.hotArchive )
1850
+ {
1851
+ if (key.type () != TTL)
1852
+ {
1853
+ ltxInner.addRestoredFromHotArchiveKey (key);
1854
+ }
1855
+ }
1856
+ for (auto const & key : restoredKeys.liveBucketList )
1857
+ {
1858
+ if (key.type () != TTL)
1859
+ {
1860
+ ltxInner.addRestoredFromLiveBucketListKey (key);
1861
+ }
1862
+ }
1863
+ }
1822
1864
for (auto const & thread : stage)
1823
1865
{
1824
1866
for (auto const & txBundle : thread)
@@ -1886,6 +1928,12 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1886
1928
auto newLiveUntil =
1887
1929
updatedEntry.data .ttl ().liveUntilLedgerSeq ;
1888
1930
1931
+
1932
+ // The only scenario where we accept a reduction in TTL
1933
+ // is one where an entry was deleted, and then
1934
+ // recreated. This can only happen if the key was in the
1935
+ // readWrite set, so if it's not then this is just a
1936
+ // parallel readOnly bump that we can ignore here.
1889
1937
if (newLiveUntil <= currLiveUntil &&
1890
1938
isInReadWriteSet.count (entry.first ) == 0 )
1891
1939
{
0 commit comments