@@ -1617,7 +1617,7 @@ LedgerManagerImpl::collectEntries(AppConnector& app, AbstractLedgerTxn& ltx,
1617
1617
return entryMap;
1618
1618
}
1619
1619
1620
- void
1620
+ RestoredKeys
1621
1621
LedgerManagerImpl::applyThread (AppConnector& app, ThreadEntryMap& entryMap,
1622
1622
Thread const & thread, Config const & config,
1623
1623
SorobanNetworkConfig const & sorobanConfig,
@@ -1629,6 +1629,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1629
1629
// we accumulate the RO extensions until we need to apply them.
1630
1630
UnorderedMap<LedgerKey, uint32_t > readOnlyTtlExtensions;
1631
1631
1632
+ RestoredKeys threadRestoredKeys;
1632
1633
for (auto const & txBundle : thread)
1633
1634
{
1634
1635
releaseAssertOrThrow (txBundle.getResPayload ());
@@ -1676,7 +1677,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1676
1677
txBundle.getResPayload (), sorobanMetrics, txSubSeed,
1677
1678
txBundle.getEffects ());
1678
1679
1679
- if (res.mSuccess )
1680
+ if (res.getSuccess () )
1680
1681
{
1681
1682
UnorderedMap<LedgerKey, bool > isReadOnlyTTLMap;
1682
1683
for (auto const & ro :
@@ -1699,7 +1700,7 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1699
1700
}
1700
1701
1701
1702
// now apply the entry changes to entryMap
1702
- for (auto const & entry : res.mModifiedEntryMap )
1703
+ for (auto const & entry : res.getModifiedEntryMap () )
1703
1704
{
1704
1705
auto const & lk = entry.first ;
1705
1706
auto const & updatedLe = entry.second ;
@@ -1738,11 +1739,24 @@ LedgerManagerImpl::applyThread(AppConnector& app, ThreadEntryMap& entryMap,
1738
1739
it->second = {updatedLe, true };
1739
1740
}
1740
1741
}
1742
+
1743
+ for (auto const & key : res.getRestoredKeys ().hotArchive )
1744
+ {
1745
+ auto [_, inserted] = threadRestoredKeys.hotArchive .emplace (key);
1746
+ releaseAssertOrThrow (inserted);
1747
+ }
1748
+ for (auto const & key : res.getRestoredKeys ().liveBucketList )
1749
+ {
1750
+ auto [_, inserted] =
1751
+ threadRestoredKeys.liveBucketList .emplace (key);
1752
+ releaseAssertOrThrow (inserted);
1753
+ }
1741
1754
}
1742
1755
else
1743
1756
{
1744
1757
releaseAssertOrThrow (!txBundle.getResPayload ()->isSuccess ());
1745
1758
}
1759
+ return threadRestoredKeys;
1746
1760
}
1747
1761
1748
1762
for (auto kvp : readOnlyTtlExtensions)
@@ -1798,6 +1812,8 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1798
1812
entryMapsByThread.emplace_back (collectEntries (app, ltx, thread));
1799
1813
}
1800
1814
1815
+ std::vector<std::future<RestoredKeys>> threadRestoredKeyFutures;
1816
+
1801
1817
auto ledgerInfo = getParallelLedgerInfo (app, ltx);
1802
1818
std::vector<std::thread> threads;
1803
1819
for (size_t i = 0 ; i < stage.size (); ++i)
@@ -1806,19 +1822,51 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1806
1822
1807
1823
auto const & thread = stage.at (i);
1808
1824
1809
- threads.emplace_back (&LedgerManagerImpl::applyThread, this ,
1825
+ /* threads.emplace_back(&LedgerManagerImpl::applyThread, this,
1810
1826
std::ref(app), std::ref(entryMapByThread),
1811
1827
std::ref(thread), config, sorobanConfig,
1812
1828
std::ref(ledgerInfo), sorobanBasePrngSeed,
1813
1829
std::ref(sorobanMetrics));
1830
+ */
1831
+ threadRestoredKeyFutures.emplace_back (std::async (
1832
+ std::launch::async, &LedgerManagerImpl::applyThread, this ,
1833
+ std::ref (app), std::ref (entryMapByThread), std::ref (thread), config,
1834
+ sorobanConfig, std::ref (ledgerInfo), sorobanBasePrngSeed,
1835
+ std::ref (sorobanMetrics)));
1814
1836
}
1815
1837
1816
1838
for (auto & thread : threads)
1817
1839
{
1818
1840
thread.join ();
1819
1841
}
1820
1842
1843
+ std::vector<RestoredKeys> threadRestoredKeys;
1844
+ for (auto & restoredKeysFutures : threadRestoredKeyFutures)
1845
+ {
1846
+ threadRestoredKeys.emplace_back (restoredKeysFutures.get ());
1847
+ }
1848
+
1821
1849
LedgerTxn ltxInner (ltx);
1850
+ for (auto const & restoredKeys : threadRestoredKeys)
1851
+ {
1852
+ // We filter out the TTL keys here because LedgerTxn will add the ttl
1853
+ // key
1854
+ for (auto const & key : restoredKeys.hotArchive )
1855
+ {
1856
+ if (key.type () != TTL)
1857
+ {
1858
+ ltxInner.addRestoredFromHotArchiveKey (key);
1859
+ }
1860
+ }
1861
+ for (auto const & key : restoredKeys.liveBucketList )
1862
+ {
1863
+ if (key.type () != TTL)
1864
+ {
1865
+ ltxInner.addRestoredFromLiveBucketListKey (key);
1866
+ }
1867
+ }
1868
+ }
1869
+
1822
1870
for (auto const & thread : stage)
1823
1871
{
1824
1872
for (auto const & txBundle : thread)
@@ -1886,6 +1934,11 @@ LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
1886
1934
auto newLiveUntil =
1887
1935
updatedEntry.data .ttl ().liveUntilLedgerSeq ;
1888
1936
1937
+ // The only scenario where we accept a reduction in TTL
1938
+ // is one where an entry was deleted, and then
1939
+ // recreated. This can only happen if the key was in the
1940
+ // readWrite set, so if it's not then this is just a
1941
+ // parallel readOnly bump that we can ignore here.
1889
1942
if (newLiveUntil <= currLiveUntil &&
1890
1943
isInReadWriteSet.count (entry.first ) == 0 )
1891
1944
{
0 commit comments