Skip to content

Commit

Permalink
Recheck if empty under lock, early exit (#7442)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Sep 18, 2024
1 parent ec4a833 commit 55aa224
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

namespace Nethermind.Consensus.Processing;

public class BlockCachePreWarmer(ReadOnlyTxProcessingEnvFactory envFactory, ISpecProvider specProvider, ILogManager logManager, IWorldState? targetWorldState = null) : IBlockCachePreWarmer
public sealed class BlockCachePreWarmer(ReadOnlyTxProcessingEnvFactory envFactory, ISpecProvider specProvider, ILogManager logManager, PreBlockCaches? preBlockCaches = null) : IBlockCachePreWarmer
{
private readonly ObjectPool<IReadOnlyTxProcessorSource> _envPool = new DefaultObjectPool<IReadOnlyTxProcessorSource>(new ReadOnlyTxProcessingEnvPooledObjectPolicy(envFactory), Environment.ProcessorCount);
private readonly ObjectPool<SystemTransaction> _systemTransactionPool = new DefaultObjectPool<SystemTransaction>(new DefaultPooledObjectPolicy<SystemTransaction>(), Environment.ProcessorCount);
private readonly ILogger _logger = logManager.GetClassLogger<BlockCachePreWarmer>();

public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default)
{
if (targetWorldState is not null)
if (preBlockCaches is not null)
{
if (targetWorldState.ClearCache())
if (preBlockCaches.ClearCaches())
{
if (_logger.IsWarn) _logger.Warn("Caches are not empty. Clearing them.");
}
Expand All @@ -40,10 +40,10 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Access
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };

// Run address warmer ahead of transactions warmer, but queue to ThreadPool so it doesn't block the txs
ThreadPool.UnsafeQueueUserWorkItem(
new AddressWarmer(parallelOptions, suggestedBlock, parentStateRoot, systemTxAccessList, this), preferLocal: false);
var addressWarmer = new AddressWarmer(parallelOptions, suggestedBlock, parentStateRoot, systemTxAccessList, this);
ThreadPool.UnsafeQueueUserWorkItem(addressWarmer, preferLocal: false);
// Do not pass cancellation token to the task, we don't want exceptions to be thrown in main processing thread
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, parallelOptions, cancellationToken));
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, parallelOptions, addressWarmer, cancellationToken));
}
}

Expand All @@ -53,11 +53,9 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Access
// Parent state root is null for genesis block
private static bool IsGenesisBlock(Hash256? parentStateRoot) => parentStateRoot is null;

public void ClearCaches() => targetWorldState?.ClearCache();
public bool ClearCaches() => preBlockCaches?.ClearCaches() ?? false;

public Task ClearCachesInBackground() => targetWorldState?.ClearCachesInBackground() ?? Task.CompletedTask;

private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, ParallelOptions parallelOptions, CancellationToken cancellationToken)
private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, ParallelOptions parallelOptions, AddressWarmer addressWarmer, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;

Expand All @@ -75,6 +73,11 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot
{
if (_logger.IsDebug) _logger.Debug($"Pre-warming caches cancelled for block {suggestedBlock.Number}.");
}
finally
{
// Don't compete task until address warmer is also done.
addressWarmer.Wait();
}
}

private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
Expand Down Expand Up @@ -152,27 +155,32 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer)
: IThreadPoolWorkItem
{
private readonly ParallelOptions ParallelOptions = parallelOptions;
private readonly Block Block = block;
private readonly Hash256 StateRoot = stateRoot;
private readonly BlockCachePreWarmer PreWarmer = preWarmer;
private readonly AccessList? SystemTxAccessList = systemTxAccessList;
private readonly ManualResetEventSlim _doneEvent = new(initialState: false);

public void Wait() => _doneEvent.Wait();

void IThreadPoolWorkItem.Execute()
{
IReadOnlyTxProcessorSource env = PreWarmer._envPool.Get();
IReadOnlyTxProcessorSource env = null;
try
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
env = PreWarmer._envPool.Get();
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
WarmupAddresses(ParallelOptions, Block, scope);
WarmupAddresses(parallelOptions, Block, scope);
}
catch (Exception ex)
{
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses", ex);
}
finally
{
PreWarmer._envPool.Return(env);
if (env is not null) PreWarmer._envPool.Return(env);
_doneEvent.Set();
}
}

Expand Down
35 changes: 28 additions & 7 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public partial class BlockProcessor(
private readonly IRewardCalculator _rewardCalculator = rewardCalculator ?? throw new ArgumentNullException(nameof(rewardCalculator));
private readonly IBlockProcessor.IBlockTransactionsExecutor _blockTransactionsExecutor = blockTransactionsExecutor ?? throw new ArgumentNullException(nameof(blockTransactionsExecutor));
private readonly IBlockhashStore _blockhashStore = blockHashStore ?? throw new ArgumentNullException(nameof(blockHashStore));
private Task _clearTask = Task.CompletedTask;
private const int MaxUncommittedBlocks = 64;
private readonly Func<Task, Task> _clearCaches = _ => preWarmer.ClearCachesInBackground();
private readonly Action<Task> _clearCaches = _ => preWarmer?.ClearCaches();

/// <summary>
/// We use a single receipt tracer for all blocks. Internally receipt tracer forwards most of the calls
Expand Down Expand Up @@ -93,10 +94,13 @@ the previous head state.*/
int blocksCount = suggestedBlocks.Count;
Block[] processedBlocks = new Block[blocksCount];

Task? preWarmTask = null;
try
{
for (int i = 0; i < blocksCount; i++)
{
preWarmTask = null;
WaitForCacheClear();
Block suggestedBlock = suggestedBlocks[i];
if (blocksCount > 64 && i % 8 == 0)
{
Expand All @@ -111,28 +115,32 @@ the previous head state.*/
Block processedBlock;
TxReceipt[] receipts;

Task? preWarmTask = null;
bool skipPrewarming = preWarmer is null || suggestedBlock.Transactions.Length < 3;
if (!skipPrewarming)
{
using CancellationTokenSource cancellationTokenSource = new();
(_, AccessList? accessList) = _beaconBlockRootHandler.BeaconRootsAccessList(suggestedBlock, _specProvider.GetSpec(suggestedBlock.Header));
preWarmTask = preWarmer.PreWarmCaches(suggestedBlock, preBlockStateRoot, accessList, cancellationTokenSource.Token);

(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
preWarmTask = preWarmTask.ContinueWith(_clearCaches).Unwrap();
cancellationTokenSource.Cancel();
}
else
{
if (preWarmer?.ClearCaches() ?? false)
{
if (_logger.IsWarn) _logger.Warn("Low txs, caches are not empty. Clearing them.");
}
// Even though we skip prewarming we still need to ensure the caches are cleared
(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
}

processedBlocks[i] = processedBlock;

// be cautious here as AuRa depends on processing
PreCommitBlock(newBranchStateRoot, suggestedBlock.Number);
QueueClearCaches(preWarmer, preWarmTask);

if (notReadOnly)
{
BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(processedBlock, receipts));
Expand Down Expand Up @@ -166,13 +174,26 @@ the previous head state.*/
}
catch (Exception ex) // try to restore at all cost
{
_logger.Trace($"Encountered exception {ex} while processing blocks.");
if (_logger.IsWarn) _logger.Warn($"Encountered exception {ex} while processing blocks.");
QueueClearCaches(preWarmer, preWarmTask);
preWarmTask?.GetAwaiter().GetResult();
RestoreBranch(previousBranchStateRoot);
throw;
}
finally
}

private void WaitForCacheClear() => _clearTask.GetAwaiter().GetResult();

private void QueueClearCaches(IBlockCachePreWarmer preWarmer, Task? preWarmTask)
{
if (preWarmTask is not null)
{
// Can start clearing caches in background
_clearTask = preWarmTask.ContinueWith(_clearCaches, TaskContinuationOptions.RunContinuationsAsynchronously);
}
else if (preWarmer is not null)
{
preWarmer?.ClearCaches();
_clearTask = Task.Run(() => preWarmer.ClearCaches());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ void DeleteInvalidBlocks(in ProcessingBranch processingBranch, Hash256 invalidBl
}
catch (InvalidBlockException ex)
{
if (_logger.IsWarn) _logger.Warn($"Issue processing block {ex.InvalidBlock} {ex}");
invalidBlockHash = ex.InvalidBlock.Hash;
error = ex.Message;
Block? invalidBlock = processingBranch.BlocksToProcess.FirstOrDefault(b => b.Hash == invalidBlockHash);
Expand Down Expand Up @@ -541,10 +542,6 @@ void DeleteInvalidBlocks(in ProcessingBranch processingBranch, Hash256 invalidBl
new GethLikeBlockMemoryTracer(GethTraceOptions.Default),
DumpOptions.Geth);
}
else
{
if (_logger.IsError) _logger.Error($"Unexpected situation occurred during the handling of an invalid block {ex.InvalidBlock}", ex);
}

processedBlocks = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ namespace Nethermind.Consensus.Processing;
public interface IBlockCachePreWarmer
{
Task PreWarmCaches(Block suggestedBlock, Hash256 parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default);
void ClearCaches();
Task ClearCachesInBackground();
bool ClearCaches();
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ protected TestBlockchain()

public static TransactionBuilder<Transaction> BuildSimpleTransaction => Builders.Build.A.Transaction.SignedAndResolved(TestItem.PrivateKeyA).To(AccountB);

private PreBlockCaches PreBlockCaches { get; } = new();

protected virtual async Task<TestBlockchain> Build(ISpecProvider? specProvider = null, UInt256? initialValues = null, bool addBlockOnStart = true)
{
Timestamper = new ManualTimestamper(new DateTime(2020, 2, 15, 12, 50, 30, DateTimeKind.Utc));
Expand All @@ -129,7 +131,7 @@ protected virtual async Task<TestBlockchain> Build(ISpecProvider? specProvider =
EthereumEcdsa = new EthereumEcdsa(SpecProvider.ChainId);
DbProvider = await CreateDbProvider();
TrieStore = new TrieStore(StateDb, LogManager);
State = new WorldState(TrieStore, DbProvider.CodeDb, LogManager, new PreBlockCaches());
State = new WorldState(TrieStore, DbProvider.CodeDb, LogManager, PreBlockCaches);

// Eip4788 precompile state account
if (specProvider?.GenesisSpec?.IsBeaconBlockRootAvailable ?? false)
Expand Down Expand Up @@ -381,7 +383,7 @@ protected virtual IBlockProcessor CreateBlockProcessor() =>
preWarmer: CreateBlockCachePreWarmer());

protected virtual IBlockCachePreWarmer CreateBlockCachePreWarmer() =>
new BlockCachePreWarmer(new ReadOnlyTxProcessingEnvFactory(WorldStateManager, BlockTree, SpecProvider, LogManager, WorldStateManager.GlobalWorldState), SpecProvider, LogManager, WorldStateManager.GlobalWorldState);
new BlockCachePreWarmer(new ReadOnlyTxProcessingEnvFactory(WorldStateManager, BlockTree, SpecProvider, LogManager, WorldStateManager.GlobalWorldState), SpecProvider, LogManager, PreBlockCaches);

public async Task WaitForNewHead()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,20 @@ public static void AddRange<T>(this ICollection<T> list, params T[] items)
public static bool NoResizeClear<TKey, TValue>(this ConcurrentDictionary<TKey, TValue>? dictionary)
where TKey : notnull
{
if (dictionary is null || dictionary.IsEmpty)
if (dictionary?.IsEmpty ?? true)
{
return false;
}

using var handle = dictionary.AcquireLock();

// Recheck under lock, so not to over clear which is expensive.
// May have cleared while waiting for lock.
if (dictionary.IsEmpty)
{
return false;
}

ClearCache<TKey, TValue>.Clear(dictionary);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected virtual Task InitBlockchain()
setApi.TxPoolInfoProvider = new TxPoolInfoProvider(chainHeadInfoProvider.AccountStateProvider, txPool);
setApi.GasPriceOracle = new GasPriceOracle(getApi.BlockTree!, getApi.SpecProvider, _api.LogManager, blocksConfig.MinGasPrice);
BlockCachePreWarmer? preWarmer = blocksConfig.PreWarmStateOnBlockProcessing
? new(new(_api.WorldStateManager!, _api.BlockTree!, _api.SpecProvider, _api.LogManager, _api.WorldState), _api.SpecProvider, _api.LogManager, _api.WorldState)
? new(new(_api.WorldStateManager!, _api.BlockTree!, _api.SpecProvider, _api.LogManager, _api.WorldState), _api.SpecProvider, _api.LogManager, preBlockCaches)
: null;
IBlockProcessor mainBlockProcessor = setApi.MainBlockProcessor = CreateBlockProcessor(preWarmer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
using Nethermind.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Int256;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.JsonRpc.Modules.Trace;
using Nethermind.Logging;
using NUnit.Framework;
using Nethermind.Blockchain.Find;
using Nethermind.Consensus.Processing;
Expand All @@ -25,7 +23,6 @@
using Nethermind.Core.Crypto;
using Nethermind.Db;
using Nethermind.Evm;
using Nethermind.Evm.Tracing.ParityStyle;
using Nethermind.Evm.TransactionProcessing;
using Nethermind.Facade.Eth;
using Nethermind.Serialization.Json;
Expand All @@ -36,7 +33,6 @@

namespace Nethermind.JsonRpc.Test.Modules;

[Parallelizable(ParallelScope.All)]
[TestFixture]
public class TraceRpcModuleTests
{
Expand Down
4 changes: 0 additions & 4 deletions src/Nethermind/Nethermind.State/IWorldState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,4 @@ public interface IWorldState : IJournal<Snapshot>, IReadOnlyStateProvider

void CommitTree(long blockNumber);
ArrayPoolList<AddressAsKey>? GetAccountChanges();

bool ClearCache() => false;

Task ClearCachesInBackground() => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ public void CommitTrees(long blockNumber)
_toUpdateRoots.Clear();
// only needed here as there is no control over cached storage size otherwise
_storages.Clear();
_preBlockCache?.NoResizeClear();
}

private StorageTree GetOrCreateStorage(Address address)
Expand Down
8 changes: 1 addition & 7 deletions src/Nethermind/Nethermind.State/PreBlockCaches.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Concurrent;
using System.Numerics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Trie;
Expand All @@ -20,7 +19,6 @@ public class PreBlockCaches
private static int LockPartitions => CollectionExtensions.LockPartitions;

private readonly Func<bool>[] _clearCaches;
private readonly Action _clearAllCaches;

private readonly ConcurrentDictionary<StorageCell, byte[]> _storageCache = new(LockPartitions, InitialCapacity);
private readonly ConcurrentDictionary<AddressAsKey, Account> _stateCache = new(LockPartitions, InitialCapacity);
Expand All @@ -36,18 +34,14 @@ public PreBlockCaches()
_rlpCache.NoResizeClear,
_precompileCache.NoResizeClear
];

_clearAllCaches = () => ClearImmediate();
}

public ConcurrentDictionary<StorageCell, byte[]> StorageCache => _storageCache;
public ConcurrentDictionary<AddressAsKey, Account> StateCache => _stateCache;
public ConcurrentDictionary<NodeKey, byte[]?> RlpCache => _rlpCache;
public ConcurrentDictionary<PrecompileCacheKey, (ReadOnlyMemory<byte>, bool)> PrecompileCache => _precompileCache;

public Task ClearCachesInBackground() => Task.Run(_clearAllCaches);

public bool ClearImmediate()
public bool ClearCaches()
{
bool isDirty = false;
foreach (Func<bool> clearCache in _clearCaches)
Expand Down
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.State/StateProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ public void CommitTree(long blockNumber)
}

_tree.Commit(blockNumber);
_preBlockCache?.NoResizeClear();
}

public static void CommitBranch()
Expand Down
5 changes: 0 additions & 5 deletions src/Nethermind/Nethermind.State/WorldState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
Expand Down Expand Up @@ -271,9 +270,5 @@ public void CreateAccountIfNotExists(Address address, in UInt256 balance, in UIn
ArrayPoolList<AddressAsKey>? IWorldState.GetAccountChanges() => _stateProvider.ChangedAddresses();

PreBlockCaches? IPreBlockCaches.Caches => PreBlockCaches;

public bool ClearCache() => PreBlockCaches?.ClearImmediate() == true;

public Task ClearCachesInBackground() => PreBlockCaches?.ClearCachesInBackground() ?? Task.CompletedTask;
}
}
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nod
public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
_inner.FinishBlockCommit(trieType, blockNumber, address, root, writeFlags);
_preBlockCache.NoResizeClear();
}

public bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak)
Expand Down

0 comments on commit 55aa224

Please sign in to comment.