Skip to content

Commit

Permalink
Flush and balance persist writse
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap committed Oct 2, 2024
1 parent 5d33478 commit 6815b86
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Threading;
using Nethermind.Logging;

namespace Nethermind.Trie.Pruning
Expand All @@ -40,17 +39,6 @@ public class TrieStore : ITrieStore, IPruningTrieStore
// This seems to attempt to prevent the dirty nodes from being mutated by other thread at the same time.
private object _dirtyNodesWriteLock = new object();

/*
private TrieStoreDirtyNodesCache DirtyNodes => _dirtyNodes ?? CreateCacheAtomic(ref _dirtyNodes);
[MethodImpl(MethodImplOptions.NoInlining)]
private TrieStoreDirtyNodesCache CreateCacheAtomic(ref TrieStoreDirtyNodesCache val)
{
TrieStoreDirtyNodesCache instance = new(this, _pruningStrategy.TrackedPastKeyCount, !_nodeStorage.RequirePath, _logger);
TrieStoreDirtyNodesCache? prior = Interlocked.CompareExchange(ref val, instance, null);
return prior ?? instance;
}
*/

private bool _livePruningEnabled = false;

private bool _lastPersistedReachedReorgBoundary;
Expand Down Expand Up @@ -490,6 +478,12 @@ public void Prune()
{
try
{
// Flush ahead of time so that memtable is empty which prevent stalling when writing nodes.
// Note, the WriteBufferSize * WriteBufferNumber need to be more than about 20% of pruning cache
// otherwise, it may not fit the whole dirty cache.
// Additionally, if (WriteBufferSize * (WriteBufferNumber - 1)) is already more than 20% of pruning
// cache, it is likely that there are enough space for it on most time, except for syncing maybe.
_nodeStorage.Flush();
lock (_dirtyNodesLock)
{
lock (_dirtyNodesWriteLock)
Expand Down Expand Up @@ -604,7 +598,6 @@ private bool SaveSnapshot()
{
BlockCommitSet blockCommitSet = candidateSets[index];
if (_logger.IsDebug) _logger.Debug($"Elevated pruning for candidate {blockCommitSet.BlockNumber}");
// TODO: Parallelize this
ParallelPersistBlockCommitSet(null, blockCommitSet, persistedNodeRecorder);
}

Expand Down Expand Up @@ -825,16 +818,31 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)

Stopwatch stopwatch = Stopwatch.StartNew();

// The first call recursive stop at two level, yielding 256 leaf in parallelStartNodes, which is run concurrently
// The first CallRecursive stop at two level, yielding 256 node in parallelStartNodes, which is run concurrently
TreePath path = TreePath.Empty;
commitSet.Root?.CallRecursively(TopLevelPersist, address, ref path, GetTrieStore(null), true, _logger, maxPathLength: parallelBoundaryPathLength);

// The amount of change in the subtrees are not balanced at all. So their writes ares buffered here
// which get disposed in parallel instead of being disposed in `PersistNodeStartingFrom`.
BlockingCollection<INodeStorage.WriteBatch> disposeQueue = new BlockingCollection<INodeStorage.WriteBatch>(4);

Task disposeTask = Task.WhenAll(Enumerable.Range(0, Environment.ProcessorCount).Select((_) => Task.Run(() =>
{
while (disposeQueue.TryTake(out INodeStorage.WriteBatch disposable, Timeout.Infinite))
{
disposable.Dispose();
}
})).ToArray());

Task.WaitAll(parallelStartNodes.Select((entry) => Task.Run(() =>
{
(TrieNode trieNode, Hash256? address2, TreePath path2) = entry;
PersistNodeStartingFrom(trieNode, address2, path2, commitSet, persistedNodeRecorder, writeFlags);
PersistNodeStartingFrom(trieNode, address2, path2, commitSet, persistedNodeRecorder, writeFlags, disposeQueue);
})).ToArray());

disposeQueue.CompleteAdding();
disposeTask.Wait();

// Dispose top level last in case something goes wrong, at least the root wont be stored
topLevelWriteBatch.Dispose();

Expand All @@ -846,19 +854,28 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
LastPersistedBlockNumber = commitSet.BlockNumber;
}

private void PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, BlockCommitSet commitSet, Action<TreePath, Hash256?, TrieNode>? persistedNodeRecorder,
WriteFlags writeFlags)
private void PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, BlockCommitSet commitSet,
Action<TreePath, Hash256?, TrieNode>? persistedNodeRecorder,
WriteFlags writeFlags, BlockingCollection<INodeStorage.WriteBatch> disposeQueue)
{
long persistedNodeCount = 0;
INodeStorage.WriteBatch writeBatch = _nodeStorage.StartWriteBatch();

using INodeStorage.WriteBatch writeBatch = _nodeStorage.StartWriteBatch();

void NewFunction(TrieNode node, Hash256? address3, TreePath path2)
void DoPersist(TrieNode node, Hash256? address3, TreePath path2)
{
persistedNodeRecorder?.Invoke(path2, address3, node);
this.PersistNode(address3, path2, node, commitSet.BlockNumber, writeFlags, writeBatch);
PersistNode(address3, path2, node, commitSet.BlockNumber, writeFlags, writeBatch);

persistedNodeCount++;
if (persistedNodeCount % 512 == 0)
{
disposeQueue.Add(writeBatch);
writeBatch = _nodeStorage.StartWriteBatch();
}
}

tn.CallRecursively(NewFunction, address2, ref path, GetTrieStore(address2), true, _logger);
tn.CallRecursively(DoPersist, address2, ref path, GetTrieStore(address2), true, _logger);
disposeQueue.Add(writeBatch);
}

private void PersistNode(Hash256? address, in TreePath path, TrieNode currentNode, long blockNumber, WriteFlags writeFlags = WriteFlags.None, INodeStorage.WriteBatch? writeBatch = null)
Expand Down

0 comments on commit 6815b86

Please sign in to comment.