Skip to content

Commit

Permalink
Prewarm tx addresses in parallel (#7423)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Sep 14, 2024
1 parent d0f32a7 commit 60159fb
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,49 @@ public class BeaconBlockRootHandler(ITransactionProcessor processor) : IBeaconBl
{
private const long GasLimit = 30_000_000L;

public void StoreBeaconRoot(Block block, IReleaseSpec spec)
public (Address? toAddress, AccessList? accessList) BeaconRootsAccessList(Block block, IReleaseSpec spec, bool includeStorageCells = true)
{
BlockHeader? header = block.Header;
var canInsertBeaconRoot = spec.IsBeaconBlockRootAvailable
bool canInsertBeaconRoot = spec.IsBeaconBlockRootAvailable
&& !header.IsGenesis
&& header.ParentBeaconBlockRoot is not null;

if (canInsertBeaconRoot)
Address? eip4788ContractAddress = canInsertBeaconRoot ?
spec.Eip4788ContractAddress ?? Eip4788Constants.BeaconRootsAddress :
null;

if (eip4788ContractAddress is null)
{
return (null, null);
}

var builder = new AccessList.Builder()
.AddAddress(eip4788ContractAddress);

if (includeStorageCells)
{
builder.AddStorage(block.Timestamp % 8191);
}

return (eip4788ContractAddress, builder.Build());
}

public void StoreBeaconRoot(Block block, IReleaseSpec spec)
{
(Address? toAddress, AccessList? accessList) = BeaconRootsAccessList(block, spec, includeStorageCells: false);

if (toAddress is not null)
{
Address beaconRootsAddress = spec.Eip4788ContractAddress ?? Eip4788Constants.BeaconRootsAddress;
BlockHeader? header = block.Header;
Transaction transaction = new()
{
Value = UInt256.Zero,
Data = header.ParentBeaconBlockRoot.Bytes.ToArray(),
To = beaconRootsAddress,
To = toAddress,
SenderAddress = Address.SystemUser,
GasLimit = GasLimit,
GasPrice = UInt256.Zero,
AccessList = new AccessList.Builder().AddAddress(beaconRootsAddress).Build()
AccessList = accessList
};

transaction.Hash = transaction.CalculateHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Core;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Specs;

namespace Nethermind.Blockchain.BeaconBlockRoot;
public interface IBeaconBlockRootHandler
{
(Address? toAddress, AccessList? accessList) BeaconRootsAccessList(Block block, IReleaseSpec spec, bool includeStorageCells = true);
void StoreBeaconRoot(Block block, IReleaseSpec spec);
}
191 changes: 128 additions & 63 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Nethermind.Evm.TransactionProcessing;
using Nethermind.Logging;
using Nethermind.State;
using Nethermind.Core.Eip2930;

namespace Nethermind.Consensus.Processing;

Expand All @@ -24,7 +25,7 @@ public class BlockCachePreWarmer(ReadOnlyTxProcessingEnvFactory envFactory, ISpe
private readonly ObjectPool<SystemTransaction> _systemTransactionPool = new DefaultObjectPool<SystemTransaction>(new DefaultPooledObjectPolicy<SystemTransaction>(), Environment.ProcessorCount);
private readonly ILogger _logger = logManager.GetClassLogger<BlockCachePreWarmer>();

public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, CancellationToken cancellationToken = default)
public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default)
{
if (targetWorldState is not null)
{
Expand All @@ -33,40 +34,38 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Cancel
if (_logger.IsWarn) _logger.Warn("Caches are not empty. Clearing them.");
}

if (!IsGenesisBlock(parentStateRoot) && Environment.ProcessorCount > 2 && !cancellationToken.IsCancellationRequested)
var physicalCoreCount = RuntimeInformation.PhysicalCoreCount;
if (!IsGenesisBlock(parentStateRoot) && physicalCoreCount > 2 && !cancellationToken.IsCancellationRequested)
{
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };

// Run address warmer ahead of transactions warmer, but queue to ThreadPool so it doesn't block the txs
ThreadPool.UnsafeQueueUserWorkItem(
new AddressWarmer(parallelOptions, suggestedBlock, parentStateRoot, systemTxAccessList, this), preferLocal: false);
// Do not pass cancellation token to the task, we don't want exceptions to be thrown in main processing thread
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, cancellationToken));
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, parallelOptions, cancellationToken));
}
}

return Task.CompletedTask;
}

// Parent state root is null for genesis block
private bool IsGenesisBlock(Hash256? parentStateRoot) => parentStateRoot is null;
private static bool IsGenesisBlock(Hash256? parentStateRoot) => parentStateRoot is null;

public void ClearCaches() => targetWorldState?.ClearCache();

public Task ClearCachesInBackground() => targetWorldState?.ClearCachesInBackground() ?? Task.CompletedTask;

private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, CancellationToken cancellationToken)
private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, ParallelOptions parallelOptions, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;

try
{
var physicalCoreCount = RuntimeInformation.PhysicalCoreCount;
if (physicalCoreCount < 2)
{
if (_logger.IsDebug) _logger.Debug("Physical core count is less than 2. Skipping pre-warming.");
return;
}
if (_logger.IsDebug) _logger.Debug($"Started pre-warming caches for block {suggestedBlock.Number}.");

ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };
IReleaseSpec spec = specProvider.GetSpec(suggestedBlock.Header);

WarmupTransactions(parallelOptions, spec, suggestedBlock, parentStateRoot);
WarmupWithdrawals(parallelOptions, spec, suggestedBlock, parentStateRoot);

Expand All @@ -76,75 +75,141 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot
{
if (_logger.IsDebug) _logger.Debug($"Pre-warming caches cancelled for block {suggestedBlock.Number}.");
}
}

void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex);
}
finally
{
_envPool.Return(env);
}
});
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex);
}
finally
{
_envPool.Return(env);
}
});
}
}

private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
}
});
}

private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer)
: IThreadPoolWorkItem
{
private readonly ParallelOptions ParallelOptions = parallelOptions;
private readonly Block Block = block;
private readonly Hash256 StateRoot = stateRoot;
private readonly BlockCachePreWarmer PreWarmer = preWarmer;
private readonly AccessList? SystemTxAccessList = systemTxAccessList;

void IThreadPoolWorkItem.Execute()
{
IReadOnlyTxProcessorSource env = PreWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
WarmupAddresses(ParallelOptions, Block, scope);
}
catch (Exception ex)
{
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses", ex);
}
finally
{
PreWarmer._envPool.Return(env);
}
}

void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
private void WarmupAddresses(ParallelOptions parallelOptions, Block block, IReadOnlyTxProcessingScope scope)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

if (SystemTxAccessList is not null)
{
scope.WorldState.WarmUp(SystemTxAccessList);
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
int i = 0;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Address? sender = tx.SenderAddress;
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
}
Address to = tx.To;
if (to is not null)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
scope.WorldState.WarmUp(to);
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses {i}", ex);
}
});
}
Expand Down
27 changes: 19 additions & 8 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Nethermind.Consensus.Withdrawals;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Specs;
using Nethermind.Crypto;
using Nethermind.Evm;
Expand Down Expand Up @@ -107,17 +108,27 @@ the previous head state.*/
BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock));
}

using CancellationTokenSource cancellationTokenSource = new();
Task? preWarmTask = suggestedBlock.Transactions.Length < 3
? null
: preWarmer?.PreWarmCaches(suggestedBlock, preBlockStateRoot!, cancellationTokenSource.Token);
(Block processedBlock, TxReceipt[] receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
if (preWarmTask is not null)
Block processedBlock;
TxReceipt[] receipts;

Task? preWarmTask = null;
bool skipPrewarming = preWarmer is null || suggestedBlock.Transactions.Length < 3;
if (!skipPrewarming)
{
using CancellationTokenSource cancellationTokenSource = new();
(_, AccessList? accessList) = _beaconBlockRootHandler.BeaconRootsAccessList(suggestedBlock, _specProvider.GetSpec(suggestedBlock.Header));
preWarmTask = preWarmer.PreWarmCaches(suggestedBlock, preBlockStateRoot, accessList, cancellationTokenSource.Token);

(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
preWarmTask = preWarmTask.ContinueWith(_clearCaches).Unwrap();
cancellationTokenSource.Cancel();
}
cancellationTokenSource.Cancel();
else
{
(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
}

processedBlocks[i] = processedBlock;

// be cautious here as AuRa depends on processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Eip2930;

namespace Nethermind.Consensus.Processing;

public interface IBlockCachePreWarmer
{
Task PreWarmCaches(Block suggestedBlock, Hash256 parentStateRoot, CancellationToken cancellationToken = default);
Task PreWarmCaches(Block suggestedBlock, Hash256 parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default);
void ClearCaches();
Task ClearCachesInBackground();
}

0 comments on commit 60159fb

Please sign in to comment.