diff --git a/src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs b/src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs index 496f107e7ea..867deec5a7a 100644 --- a/src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs +++ b/src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs @@ -10,6 +10,7 @@ using Nethermind.Consensus; using Nethermind.Consensus.Comparers; using Nethermind.Consensus.Processing; +using Nethermind.Consensus.Processing.CensorshipDetector; using Nethermind.Consensus.Producers; using Nethermind.Consensus.Rewards; using Nethermind.Consensus.Scheduler; @@ -97,5 +98,6 @@ public interface IApiWithBlockchain : IApiWithStores, IBlockchainBridgeFactory IBlockProductionPolicy? BlockProductionPolicy { get; set; } INodeStorageFactory NodeStorageFactory { get; set; } BackgroundTaskScheduler BackgroundTaskScheduler { get; set; } + CensorshipDetector CensorshipDetector { get; set; } } } diff --git a/src/Nethermind/Nethermind.Api/NethermindApi.cs b/src/Nethermind/Nethermind.Api/NethermindApi.cs index e07aa13ba9e..821375276b5 100644 --- a/src/Nethermind/Nethermind.Api/NethermindApi.cs +++ b/src/Nethermind/Nethermind.Api/NethermindApi.cs @@ -56,6 +56,7 @@ using Nethermind.Wallet; using Nethermind.Sockets; using Nethermind.Trie; +using Nethermind.Consensus.Processing.CensorshipDetector; namespace Nethermind.Api { @@ -219,6 +220,7 @@ public ISealEngine SealEngine public IBlockProductionPolicy? BlockProductionPolicy { get; set; } public INodeStorageFactory NodeStorageFactory { get; set; } = null!; public BackgroundTaskScheduler BackgroundTaskScheduler { get; set; } = null!; + public CensorshipDetector CensorshipDetector { get; set; } = null!; public IWallet? Wallet { get; set; } public IBlockStore? BadBlocksStore { get; set; } public ITransactionComparerProvider? TransactionComparerProvider { get; set; } diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeTests.cs index a356bc99816..db457642581 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeTests.cs @@ -1800,8 +1800,6 @@ public void BlockAddedToMain_should_have_updated_Head() Block block0 = Build.A.Block.WithNumber(0).WithDifficulty(1).TestObject; Block block1 = Build.A.Block.WithNumber(1).WithDifficulty(2).WithParent(block0).TestObject; AddToMain(blockTree, block0); - blockTree.SuggestBlock(block0); - blockTree.UpdateMainChain(new[] { block0 }, true); long blockAddedToMainHeadNumber = 0; blockTree.BlockAddedToMain += (_, _) => { blockAddedToMainHeadNumber = blockTree.Head!.Header.Number; }; diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs index a5dc4884c80..41bc3472f20 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs @@ -89,6 +89,7 @@ public Block[] Process(Hash256 newBranchStateRoot, List suggestedBlocks, { BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); Block suggestedBlock = suggestedBlocks[i]; + BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock)); Hash256 hash = suggestedBlock.Hash!; if (!_allowed.Contains(hash)) { @@ -119,6 +120,8 @@ public Block[] Process(Hash256 newBranchStateRoot, List suggestedBlocks, public event EventHandler? BlocksProcessing; + public event EventHandler? BlockProcessing; + public event EventHandler? BlockProcessed; public event EventHandler? TransactionProcessed diff --git a/src/Nethermind/Nethermind.Consensus.Test/CensorshipDetectorTests.cs b/src/Nethermind/Nethermind.Consensus.Test/CensorshipDetectorTests.cs new file mode 100644 index 00000000000..4e8ba989451 --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus.Test/CensorshipDetectorTests.cs @@ -0,0 +1,291 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; +using Nethermind.Blockchain; +using Nethermind.Consensus.Comparers; +using Nethermind.Consensus.Processing; +using Nethermind.Consensus.Processing.CensorshipDetector; +using Nethermind.Consensus.Validators; +using Nethermind.Core; +using Nethermind.Core.Crypto; +using Nethermind.Core.Extensions; +using Nethermind.Core.Specs; +using Nethermind.Core.Test.Builders; +using Nethermind.Crypto; +using Nethermind.Db; +using Nethermind.Logging; +using Nethermind.Specs; +using Nethermind.Specs.Forks; +using Nethermind.State; +using Nethermind.Trie.Pruning; +using Nethermind.TxPool; +using NSubstitute; +using NUnit.Framework; + +namespace Nethermind.Consensus.Test; + +[TestFixture] +public class CensorshipDetectorTests +{ + private ILogManager _logManager; + private WorldState _stateProvider; + private IBlockTree _blockTree; + private IBlockProcessor _blockProcessor; + private ISpecProvider _specProvider; + private IEthereumEcdsa _ethereumEcdsa; + private IComparer _comparer; + private TxPool.TxPool _txPool; + private CensorshipDetector _censorshipDetector; + + [SetUp] + public void Setup() + { + _logManager = LimboLogs.Instance; + TrieStore trieStore = new(new MemDb(), _logManager); + MemDb codeDb = new(); + _stateProvider = new WorldState(trieStore, codeDb, _logManager); + _blockProcessor = Substitute.For(); + } + + [TearDown] + public void TearDown() + { + _txPool.Dispose(); + _censorshipDetector.Dispose(); + } + + // Address Censorship is given to be false here since censorship is not being detected for any address. + [Test] + public void Censorship_when_address_censorship_is_false_and_high_paying_tx_censorship_is_true_for_all_blocks_in_main_cache() + { + _txPool = CreatePool(); + _censorshipDetector = new(_blockTree, _txPool, _comparer, _blockProcessor, _logManager, new CensorshipDetectorConfig() { }); + + Transaction tx1 = SubmitTxToPool(1, TestItem.PrivateKeyA, TestItem.AddressA); + Transaction tx2 = SubmitTxToPool(2, TestItem.PrivateKeyB, TestItem.AddressA); + Transaction tx3 = SubmitTxToPool(3, TestItem.PrivateKeyC, TestItem.AddressA); + Transaction tx4 = SubmitTxToPool(4, TestItem.PrivateKeyD, TestItem.AddressA); + Transaction tx5 = SubmitTxToPool(5, TestItem.PrivateKeyE, TestItem.AddressA); + + Block block1 = Build.A.Block.WithNumber(1).WithBaseFeePerGas(0).WithTransactions([tx4]).WithParentHash(TestItem.KeccakA).TestObject; + ValueHash256 blockHash1 = block1.Hash!; + BlockProcessingWorkflow(block1); + + Block block2 = Build.A.Block.WithNumber(2).WithBaseFeePerGas(0).WithTransactions([tx3]).WithParentHash(blockHash1).TestObject; + ValueHash256 blockHash2 = block2.Hash!; + BlockProcessingWorkflow(block2); + + Block block3 = Build.A.Block.WithNumber(3).WithBaseFeePerGas(0).WithTransactions([tx2]).WithParentHash(blockHash2).TestObject; + ValueHash256 blockHash3 = block3.Hash!; + BlockProcessingWorkflow(block3); + + Block block4 = Build.A.Block.WithNumber(4).WithBaseFeePerGas(0).WithTransactions([tx1]).WithParentHash(blockHash3).TestObject; + BlockProcessingWorkflow(block4); + + Assert.That(() => _censorshipDetector.GetCensoredBlocks().Contains(new BlockNumberHash(block4)), Is.EqualTo(true).After(10, 1)); + } + + // Address Censorship is given to be false here since censorship is not being detected for any address. + [Test] + public void No_censorship_when_address_censorship_is_false_and_high_paying_tx_censorship_is_false_for_some_blocks_in_main_cache() + { + _txPool = CreatePool(); + _censorshipDetector = new(_blockTree, _txPool, _comparer, _blockProcessor, _logManager, new CensorshipDetectorConfig() { }); + + Transaction tx1 = SubmitTxToPool(1, TestItem.PrivateKeyA, TestItem.AddressA); + Transaction tx2 = SubmitTxToPool(2, TestItem.PrivateKeyB, TestItem.AddressA); + Transaction tx3 = SubmitTxToPool(3, TestItem.PrivateKeyC, TestItem.AddressA); + Transaction tx4 = SubmitTxToPool(4, TestItem.PrivateKeyD, TestItem.AddressA); + Transaction tx5 = SubmitTxToPool(5, TestItem.PrivateKeyE, TestItem.AddressA); + + // high-paying tx censorship: true + Block block1 = Build.A.Block.WithNumber(1).WithBaseFeePerGas(0).WithTransactions([tx4]).WithParentHash(TestItem.KeccakA).TestObject; + ValueHash256 blockHash1 = block1.Hash!; + BlockProcessingWorkflow(block1); + + // address censorship: false + Block block2 = Build.A.Block.WithNumber(2).WithBaseFeePerGas(0).WithTransactions([tx3, tx5]).WithParentHash(blockHash1).TestObject; + ValueHash256 blockHash2 = block2.Hash!; + BlockProcessingWorkflow(block2); + + // high-paying tx censorship: false + Block block3 = Build.A.Block.WithNumber(3).WithBaseFeePerGas(0).WithTransactions([tx2]).WithParentHash(blockHash2).TestObject; + ValueHash256 blockHash3 = block3.Hash!; + BlockProcessingWorkflow(block3); + + // high-paying tx censorship: false + Block block4 = Build.A.Block.WithNumber(4).WithBaseFeePerGas(0).WithTransactions([tx1]).WithParentHash(blockHash3).TestObject; + BlockProcessingWorkflow(block4); + + Assert.That(() => _censorshipDetector.GetCensoredBlocks().Contains(new BlockNumberHash(block4)), Is.EqualTo(false).After(10, 1)); + } + + // High-Paying Tx Censorship is given to be false here. + [Test] + public void Censorship_when_high_paying_tx_censorship_is_false_and_address_censorship_is_true_for_all_blocks_in_main_cache() + { + _txPool = CreatePool(); + _censorshipDetector = new( + _blockTree, + _txPool, + _comparer, + _blockProcessor, + _logManager, + new CensorshipDetectorConfig() + { + AddressesForCensorshipDetection = [ + TestItem.AddressA.ToString(), + TestItem.AddressB.ToString(), + TestItem.AddressC.ToString(), + TestItem.AddressD.ToString(), + TestItem.AddressE.ToString(), + TestItem.AddressF.ToString()] + }); + + Transaction tx1 = SubmitTxToPool(1, TestItem.PrivateKeyA, TestItem.AddressA); + Transaction tx2 = SubmitTxToPool(2, TestItem.PrivateKeyB, TestItem.AddressB); + Transaction tx3 = SubmitTxToPool(3, TestItem.PrivateKeyC, TestItem.AddressC); + Transaction tx4 = SubmitTxToPool(4, TestItem.PrivateKeyD, TestItem.AddressD); + Transaction tx5 = SubmitTxToPool(5, TestItem.PrivateKeyE, TestItem.AddressE); + Transaction tx6 = SubmitTxToPool(6, TestItem.PrivateKeyF, TestItem.AddressF); + + Block block1 = Build.A.Block.WithNumber(1).WithBaseFeePerGas(0).WithTransactions([tx1, tx6]).WithParentHash(TestItem.KeccakA).TestObject; + ValueHash256 blockHash1 = block1.Hash!; + BlockProcessingWorkflow(block1); + + Transaction tx7 = SubmitTxToPool(7, TestItem.PrivateKeyA, TestItem.AddressA); + Transaction tx8 = SubmitTxToPool(8, TestItem.PrivateKeyF, TestItem.AddressF); + + Block block2 = Build.A.Block.WithNumber(2).WithBaseFeePerGas(0).WithTransactions([tx2, tx8]).WithParentHash(blockHash1).TestObject; + ValueHash256 blockHash2 = block2.Hash!; + BlockProcessingWorkflow(block2); + + Transaction tx9 = SubmitTxToPool(9, TestItem.PrivateKeyB, TestItem.AddressB); + Transaction tx10 = SubmitTxToPool(10, TestItem.PrivateKeyF, TestItem.AddressF); + + Block block3 = Build.A.Block.WithNumber(3).WithBaseFeePerGas(0).WithTransactions([tx3, tx10]).WithParentHash(blockHash2).TestObject; + ValueHash256 blockHash3 = block3.Hash!; + BlockProcessingWorkflow(block3); + + Transaction tx11 = SubmitTxToPool(11, TestItem.PrivateKeyC, TestItem.AddressC); + Transaction tx12 = SubmitTxToPool(12, TestItem.PrivateKeyF, TestItem.AddressF); + + Block block4 = Build.A.Block.WithNumber(4).WithBaseFeePerGas(0).WithTransactions([tx4, tx12]).WithParentHash(blockHash3).TestObject; + BlockProcessingWorkflow(block4); + + Assert.That(() => _censorshipDetector.GetCensoredBlocks().Contains(new BlockNumberHash(block4)), Is.EqualTo(true).After(10, 1)); + } + + // High-Paying Tx Censorship is given to be false here. + [Test] + public void No_censorship_when_high_paying_tx_censorship_is_false_and_address_censorship_is_false_for_some_blocks_in_main_cache() + { + _txPool = CreatePool(); + _censorshipDetector = new( + _blockTree, + _txPool, + _comparer, + _blockProcessor, + _logManager, + new CensorshipDetectorConfig() + { + AddressesForCensorshipDetection = [ + TestItem.AddressA.ToString(), + TestItem.AddressB.ToString(), + TestItem.AddressC.ToString(), + TestItem.AddressD.ToString(), + TestItem.AddressE.ToString()] + }); + + Transaction tx1 = SubmitTxToPool(1, TestItem.PrivateKeyA, TestItem.AddressA); + Transaction tx2 = SubmitTxToPool(2, TestItem.PrivateKeyB, TestItem.AddressB); + Transaction tx3 = SubmitTxToPool(3, TestItem.PrivateKeyC, TestItem.AddressC); + Transaction tx4 = SubmitTxToPool(4, TestItem.PrivateKeyD, TestItem.AddressD); + Transaction tx5 = SubmitTxToPool(5, TestItem.PrivateKeyE, TestItem.AddressE); + + // address censorship: false + Block block1 = Build.A.Block.WithNumber(1).WithBaseFeePerGas(0).WithTransactions([tx3, tx4, tx5]).WithParentHash(TestItem.KeccakA).TestObject; + ValueHash256 blockHash1 = block1.Hash!; + BlockProcessingWorkflow(block1); + + Transaction tx6 = SubmitTxToPool(6, TestItem.PrivateKeyC, TestItem.AddressC); + Transaction tx7 = SubmitTxToPool(7, TestItem.PrivateKeyD, TestItem.AddressD); + Transaction tx8 = SubmitTxToPool(8, TestItem.PrivateKeyE, TestItem.AddressE); + + // address censorship: false + Block block2 = Build.A.Block.WithNumber(2).WithBaseFeePerGas(0).WithTransactions([tx7, tx8]).WithParentHash(blockHash1).TestObject; + ValueHash256 blockHash2 = block2.Hash!; + BlockProcessingWorkflow(block2); + + Transaction tx9 = SubmitTxToPool(9, TestItem.PrivateKeyD, TestItem.AddressD); + Transaction tx10 = SubmitTxToPool(10, TestItem.PrivateKeyE, TestItem.AddressE); + + // address censorship: true + Block block3 = Build.A.Block.WithNumber(3).WithBaseFeePerGas(0).WithTransactions([tx1, tx10]).WithParentHash(blockHash2).TestObject; + ValueHash256 blockHash3 = block3.Hash!; + BlockProcessingWorkflow(block3); + + // address censorship: false + Block block4 = Build.A.Block.WithNumber(4).WithBaseFeePerGas(0).WithTransactions([tx2, tx6, tx9]).WithParentHash(blockHash3).TestObject; + BlockProcessingWorkflow(block4); + + Assert.That(() => _censorshipDetector.GetCensoredBlocks().Contains(new BlockNumberHash(block4)), Is.EqualTo(false).After(10, 1)); + } + + private TxPool.TxPool CreatePool(bool eip1559Enabled = true) + { + if (eip1559Enabled) + { + _specProvider = Substitute.For(); + _specProvider.GetSpec(Arg.Any()).IsEip1559Enabled.Returns(true); + } + else + { + _specProvider = MainnetSpecProvider.Instance; + } + + _blockTree = Substitute.For(); + _blockTree.FindBestSuggestedHeader().Returns(Build.A.BlockHeader.WithNumber(1_000_000).TestObject); + _blockTree.Head.Returns(Build.A.Block.WithNumber(1_000_000).TestObject); + _ethereumEcdsa = new EthereumEcdsa(_specProvider.ChainId); + _comparer = new TransactionComparerProvider(_specProvider, _blockTree).GetDefaultComparer(); + + return new( + _ethereumEcdsa, + new BlobTxStorage(), + new ChainHeadInfoProvider(_specProvider, _blockTree, _stateProvider), + new TxPoolConfig(), + new TxValidator(_specProvider.ChainId), + _logManager, + _comparer); + } + + private void BlockProcessingWorkflow(Block block) + { + _blockProcessor.BlockProcessing += Raise.EventWith(new BlockEventArgs(block)); + Assert.That(() => _censorshipDetector.BlockPotentiallyCensored(block.Number, block.Hash), Is.EqualTo(true).After(10, 1)); + + foreach (Transaction tx in block.Transactions) + { + _txPool.RemoveTransaction(tx.Hash); + } + } + + private Transaction SubmitTxToPool(int maxPriorityFeePerGas, PrivateKey privateKey, Address address) + { + Transaction tx = Build.A.Transaction. + WithType(TxType.EIP1559). + WithMaxFeePerGas(20.Wei()). + WithMaxPriorityFeePerGas(maxPriorityFeePerGas.Wei()). + WithTo(address). + SignedAndResolved(_ethereumEcdsa, privateKey). + TestObject; + _stateProvider.CreateAccount(tx.SenderAddress, 1_000_000.Wei()); + AcceptTxResult result = _txPool.SubmitTx(tx, TxHandlingOptions.PersistentBroadcast); + result.Should().Be(AcceptTxResult.Accepted); + return tx; + } +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index 7a2b7a6dfa9..53e0fc89a30 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -102,6 +102,11 @@ the previous head state.*/ if (_logger.IsInfo) _logger.Info($"Processing part of a long blocks branch {i}/{blocksCount}. Block: {suggestedBlock}"); } + if (notReadOnly) + { + BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock)); + } + using CancellationTokenSource cancellationTokenSource = new(); Task? preWarmTask = suggestedBlock.Transactions.Length < 3 ? null @@ -162,6 +167,8 @@ the previous head state.*/ public event EventHandler? BlocksProcessing; + public event EventHandler? BlockProcessing; + // TODO: move to branch processor private void InitBranch(Hash256 branchStateRoot, bool incrementReorgMetric = true) { diff --git a/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetector.cs b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetector.cs new file mode 100644 index 00000000000..018da79aa26 --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetector.cs @@ -0,0 +1,272 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Nethermind.Blockchain; +using Nethermind.Core; +using Nethermind.Core.Caching; +using Nethermind.Core.Crypto; +using Nethermind.Crypto; +using Nethermind.Int256; +using Nethermind.Logging; +using Nethermind.TxPool; + +namespace Nethermind.Consensus.Processing.CensorshipDetector; + +public class CensorshipDetector : IDisposable +{ + private readonly IBlockTree _blockTree; + private readonly ITxPool _txPool; + private readonly IComparer _betterTxComparer; + private readonly IBlockProcessor _blockProcessor; + private readonly ILogger _logger; + private readonly Dictionary? _bestTxPerObservedAddresses; + private readonly LruCache _potentiallyCensoredBlocks; + private readonly WrapAroundArray _censoredBlocks; + private readonly uint _blockCensorshipThreshold; + private readonly int _cacheSize; + + public CensorshipDetector( + IBlockTree blockTree, + ITxPool txPool, + IComparer betterTxComparer, + IBlockProcessor blockProcessor, + ILogManager logManager, + ICensorshipDetectorConfig censorshipDetectorConfig) + { + _blockTree = blockTree; + _txPool = txPool; + _betterTxComparer = betterTxComparer; + _blockProcessor = blockProcessor; + _blockCensorshipThreshold = censorshipDetectorConfig.BlockCensorshipThreshold; + _cacheSize = (int)(4 * _blockCensorshipThreshold); + _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); + + if (censorshipDetectorConfig.AddressesForCensorshipDetection is not null) + { + foreach (string hexString in censorshipDetectorConfig.AddressesForCensorshipDetection) + { + if (Address.TryParse(hexString, out Address address)) + { + _bestTxPerObservedAddresses ??= new Dictionary(); + _bestTxPerObservedAddresses[address!] = null; + } + else + { + if (_logger.IsWarn) _logger.Warn($"Invalid address {hexString} provided for censorship detection."); + } + } + } + + _potentiallyCensoredBlocks = new(_cacheSize, _cacheSize, "potentiallyCensoredBlocks"); + _censoredBlocks = new(_cacheSize); + _blockProcessor.BlockProcessing += OnBlockProcessing; + } + + private bool IsSyncing() + { + long bestSuggestedNumber = _blockTree.FindBestSuggestedHeader()?.Number ?? 0; + if (bestSuggestedNumber == 0) + { + return true; + } + long headNumberOrZero = _blockTree.Head?.Number ?? 0; + return bestSuggestedNumber > headNumberOrZero; + } + + private void OnBlockProcessing(object? sender, BlockEventArgs e) + { + // skip censorship detection if node is not synced yet + if (IsSyncing()) return; + + bool tracksPerAddressCensorship = _bestTxPerObservedAddresses is not null; + if (tracksPerAddressCensorship) + { + UInt256 baseFee = e.Block.BaseFeePerGas; + IEnumerable poolBestTransactions = _txPool.GetBestTxOfEachSender(); + foreach (Transaction tx in poolBestTransactions) + { + // checking tx.GasBottleneck > baseFee ensures only ready transactions are considered. + if (tx.To is not null + && tx.GasBottleneck > baseFee + && _bestTxPerObservedAddresses.TryGetValue(tx.To, out Transaction? bestTx) + && (bestTx is null || _betterTxComparer.Compare(bestTx, tx) > 0)) + { + _bestTxPerObservedAddresses[tx.To] = tx; + } + } + } + + Task.Run(() => Cache(e.Block)); + } + + private void Cache(Block block) + { + bool tracksPerAddressCensorship = _bestTxPerObservedAddresses is not null; + + try + { + if (block.Transactions.Length == 0) + { + BlockCensorshipInfo blockCensorshipInfo = new(false, block.ParentHash); + BlockNumberHash blockNumberHash = new BlockNumberHash(block); + _potentiallyCensoredBlocks.Set(blockNumberHash, blockCensorshipInfo); + } + else + { + // Number of unique addresses specified by the user for censorship detection, to which txs are sent in the block. + long blockTxsOfTrackedAddresses = 0; + + // Number of unique addresses specified by the user for censorship detection, to which includable txs are sent in the pool. + // Includable txs consist of pool transactions better than the worst tx in block. + long poolTxsThatAreBetterThanWorstInBlock = 0; + + Transaction bestTxInBlock = block.Transactions[0]; + Transaction worstTxInBlock = block.Transactions[0]; + HashSet trackedAddressesInBlock = []; + + foreach (Transaction tx in block.Transactions) + { + if (!tx.SupportsBlobs) + { + // Finds best tx in block + if (_betterTxComparer.Compare(bestTxInBlock, tx) > 0) + { + bestTxInBlock = tx; + } + + if (tracksPerAddressCensorship) + { + // Finds worst tx in pool to compare with pool transactions of tracked addresses + if (_betterTxComparer.Compare(worstTxInBlock, tx) < 0) + { + worstTxInBlock = tx; + } + + bool trackAddress = _bestTxPerObservedAddresses.ContainsKey(tx.To!); + if (trackAddress && trackedAddressesInBlock.Add(tx.To!)) + { + blockTxsOfTrackedAddresses++; + } + } + } + } + + if (tracksPerAddressCensorship) + { + foreach (Transaction? bestTx in _bestTxPerObservedAddresses.Values) + { + // if there is no transaction in block or the best tx in the pool is better than the worst tx in the block + if (bestTx is null || _betterTxComparer.Compare(bestTx, worstTxInBlock) < 0) + { + poolTxsThatAreBetterThanWorstInBlock++; + } + } + } + + // Checking to see if the block exhibits high-paying tx censorship or address censorship or both. + // High-paying tx censorship is flagged if the best tx in the pool is not included in the block. + // Address censorship is flagged if txs sent to less than half of the user-specified addresses + // for censorship detection with includable txs in the pool are included in the block. + bool isCensored = _betterTxComparer.Compare(bestTxInBlock, _txPool.GetBestTx()) > 0 + || blockTxsOfTrackedAddresses * 2 < poolTxsThatAreBetterThanWorstInBlock; + + BlockCensorshipInfo blockCensorshipInfo = new(isCensored, block.ParentHash); + BlockNumberHash blockNumberHash = new BlockNumberHash(block); + _potentiallyCensoredBlocks.Set(blockNumberHash, blockCensorshipInfo); + + if (isCensored) + { + Metrics.NumberOfPotentiallyCensoredBlocks++; + Metrics.LastPotentiallyCensoredBlockNumber = block.Number; + DetectMultiBlockCensorship(blockNumberHash, blockCensorshipInfo); + } + } + } + finally + { + if (tracksPerAddressCensorship) + { + foreach (AddressAsKey key in _bestTxPerObservedAddresses.Keys) + { + _bestTxPerObservedAddresses[key] = null; + } + } + } + } + + private void DetectMultiBlockCensorship(BlockNumberHash block, BlockCensorshipInfo blockCensorshipInfo) + { + if (DetectPastBlockCensorship() && !_censoredBlocks.Contains(block)) + { + _censoredBlocks.Add(block); + Metrics.NumberOfCensoredBlocks++; + Metrics.LastCensoredBlockNumber = block.Number; + if (_logger.IsInfo) _logger.Info($"Censorship detected for block {block.Number} with hash {block.Hash!}"); + } + + bool DetectPastBlockCensorship() + { + // Censorship is detected if potential censorship is flagged for the last _blockCensorshipThreshold blocks including the latest. + if (block.Number >= _blockCensorshipThreshold) + { + long blockNumber = block.Number - 1; + ValueHash256 parentHash = blockCensorshipInfo.ParentHash!.Value; + for (int i = 1; i < _blockCensorshipThreshold; i++) + { + BlockCensorshipInfo info = _potentiallyCensoredBlocks.Get(new BlockNumberHash(blockNumber, parentHash)); + + if (!info.IsCensored) + { + return false; + } + + parentHash = info.ParentHash!.Value; + blockNumber--; + } + + return true; + } + + return false; + } + } + + public IEnumerable GetCensoredBlocks() => _censoredBlocks; + + public bool BlockPotentiallyCensored(long blockNumber, ValueHash256 blockHash) => _potentiallyCensoredBlocks.Contains(new BlockNumberHash(blockNumber, blockHash)); + + public void Dispose() + { + _blockProcessor.BlockProcessing -= OnBlockProcessing; + } +} + +public readonly record struct BlockCensorshipInfo(bool IsCensored, ValueHash256? ParentHash); + +public readonly record struct BlockNumberHash(long Number, ValueHash256 Hash) : IEquatable +{ + public BlockNumberHash(Block block) : this(block.Number, block.Hash ?? block.CalculateHash()) { } +} + +public class WrapAroundArray(long maxSize = 1) : IEnumerable +{ + private readonly T[] _items = new T[maxSize]; + private long _counter; + + public void Add(T item) + { + _items[(int)(_counter % maxSize)] = item; + _counter++; + } + + public bool Contains(T item) => _items.Contains(item); + + public IEnumerator GetEnumerator() => ((IEnumerable)_items).GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetectorConfig.cs b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetectorConfig.cs new file mode 100644 index 00000000000..c350c8bf8b4 --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/CensorshipDetectorConfig.cs @@ -0,0 +1,11 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +namespace Nethermind.Consensus.Processing.CensorshipDetector; + +public class CensorshipDetectorConfig : ICensorshipDetectorConfig +{ + public bool Enabled { get; set; } = true; + public uint BlockCensorshipThreshold { get; set; } = 2; + public string[]? AddressesForCensorshipDetection { get; set; } = null; +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/ICensorshipDetectorConfig.cs b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/ICensorshipDetectorConfig.cs new file mode 100644 index 00000000000..29dec2ddf4d --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/ICensorshipDetectorConfig.cs @@ -0,0 +1,21 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Nethermind.Config; + +namespace Nethermind.Consensus.Processing.CensorshipDetector; + +public interface ICensorshipDetectorConfig : IConfig +{ + [ConfigItem(DefaultValue = "true", + Description = "Enabling censorship detection feature")] + bool Enabled { get; set; } + + [ConfigItem(DefaultValue = "2", + Description = "Number of consecutive blocks with detected potential censorship to report censorship attempt")] + uint BlockCensorshipThreshold { get; set; } + + [ConfigItem(DefaultValue = "null", + Description = "The addresses for which censorship is being detected.")] + string[]? AddressesForCensorshipDetection { get; set; } +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/Metrics.cs b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/Metrics.cs new file mode 100644 index 00000000000..8b1b45758cb --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus/Processing/CensorshipDetector/Metrics.cs @@ -0,0 +1,26 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.ComponentModel; +using Nethermind.Core.Attributes; + +namespace Nethermind.Consensus.Processing.CensorshipDetector; + +public static class Metrics +{ + [CounterMetric] + [Description("Total number of censored blocks.")] + public static long NumberOfCensoredBlocks; + + [CounterMetric] + [Description("Total number of potentially censored blocks.")] + public static long NumberOfPotentiallyCensoredBlocks; + + [GaugeMetric] + [Description("Number of last potentially censored block.")] + public static long LastPotentiallyCensoredBlockNumber; + + [GaugeMetric] + [Description("Number of last known censored block.")] + public static long LastCensoredBlockNumber; +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/IBlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/IBlockProcessor.cs index 8b2af1574b7..c8dbed52213 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/IBlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/IBlockProcessor.cs @@ -32,6 +32,11 @@ Block[] Process( /// event EventHandler BlocksProcessing; + /// + /// Fired when a block is being processed. + /// + event EventHandler BlockProcessing; + /// /// Fired after a block has been processed. /// diff --git a/src/Nethermind/Nethermind.Consensus/Processing/NullBlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/NullBlockProcessor.cs index ad47599ff84..6a5814c9b76 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/NullBlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/NullBlockProcessor.cs @@ -26,6 +26,12 @@ public event EventHandler BlocksProcessing remove { } } + public event EventHandler? BlockProcessing + { + add { } + remove { } + } + public event EventHandler BlockProcessed { add { } diff --git a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs index 5bbdd17de96..b711285e1c0 100644 --- a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs +++ b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs @@ -1,10 +1,7 @@ // SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only -using System; using System.Collections.Generic; -using System.IO.Abstractions; -using System.Linq; using System.Threading; using System.Threading.Tasks; using Nethermind.Api; @@ -12,35 +9,22 @@ using Nethermind.Blockchain.BeaconBlockRoot; using Nethermind.Blockchain.Blocks; using Nethermind.Blockchain.Filters; -using Nethermind.Blockchain.FullPruning; using Nethermind.Blockchain.Receipts; using Nethermind.Blockchain.Services; -using Nethermind.Blockchain.Synchronization; using Nethermind.Config; using Nethermind.Consensus; using Nethermind.Consensus.Comparers; using Nethermind.Consensus.Processing; +using Nethermind.Consensus.Processing.CensorshipDetector; using Nethermind.Consensus.Producers; using Nethermind.Consensus.Scheduler; using Nethermind.Consensus.Validators; using Nethermind.Core; using Nethermind.Core.Attributes; -using Nethermind.Core.Crypto; -using Nethermind.Core.Extensions; -using Nethermind.Db; -using Nethermind.Db.FullPruning; using Nethermind.Evm; using Nethermind.Evm.TransactionProcessing; -using Nethermind.JsonRpc.Converters; -using Nethermind.JsonRpc.Modules.DebugModule; using Nethermind.JsonRpc.Modules.Eth.GasPrice; -using Nethermind.JsonRpc.Modules.Trace; -using Nethermind.Logging; -using Nethermind.Serialization.Json; using Nethermind.State; -using Nethermind.Synchronization.Trie; -using Nethermind.Trie; -using Nethermind.Trie.Pruning; using Nethermind.TxPool; using Nethermind.Wallet; @@ -143,6 +127,21 @@ protected virtual Task InitBlockchain() setApi.BackgroundTaskScheduler = backgroundTaskScheduler; _api.DisposeStack.Push(backgroundTaskScheduler); + ICensorshipDetectorConfig censorshipDetectorConfig = _api.Config(); + if (censorshipDetectorConfig.Enabled) + { + CensorshipDetector censorshipDetector = new( + _api.BlockTree!, + txPool, + CreateTxPoolTxComparer(), + mainBlockProcessor, + _api.LogManager, + censorshipDetectorConfig + ); + setApi.CensorshipDetector = censorshipDetector; + _api.DisposeStack.Push(censorshipDetector); + } + return Task.CompletedTask; } diff --git a/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.Setup.cs b/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.Setup.cs index 0cc6b0a5338..6983e6113ba 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.Setup.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.Setup.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using Nethermind.Api; using Nethermind.Blockchain; using Nethermind.Blockchain.BeaconBlockRoot; using Nethermind.Blockchain.Blocks; @@ -309,6 +308,12 @@ public event EventHandler? BlocksProcessing remove => _blockProcessorImplementation.BlocksProcessing -= value; } + public event EventHandler? BlockProcessing + { + add => _blockProcessorImplementation.BlockProcessing += value; + remove => _blockProcessorImplementation.BlockProcessing -= value; + } + public event EventHandler? BlockProcessed { add => _blockProcessorImplementation.BlockProcessed += value; diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Data/GetPayloadV3Result.cs b/src/Nethermind/Nethermind.Merge.Plugin/Data/GetPayloadV3Result.cs index 492d81ad71e..c12f059b149 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Data/GetPayloadV3Result.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Data/GetPayloadV3Result.cs @@ -18,8 +18,7 @@ public GetPayloadV3Result(Block block, UInt256 blockFees, BlobsBundleV1 blobsBun public override ExecutionPayloadV3 ExecutionPayload { get; } - public bool ShouldOverrideBuilder { get; } - + public bool ShouldOverrideBuilder { get; init; } public override string ToString() => $"{{ExecutionPayload: {ExecutionPayload}, Fees: {BlockValue}, BlobsBundle blobs count: {BlobsBundle.Blobs.Length}, ShouldOverrideBuilder {ShouldOverrideBuilder}}}"; } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/GetPayloadV3Handler.cs b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/GetPayloadV3Handler.cs index 9555b51f1a9..53f52a3187a 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/GetPayloadV3Handler.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/GetPayloadV3Handler.cs @@ -1,10 +1,12 @@ // SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only +using System.Linq; using Nethermind.Core.Specs; using Nethermind.Logging; using Nethermind.Merge.Plugin.BlockProduction; using Nethermind.Merge.Plugin.Data; +using Nethermind.Consensus.Processing.CensorshipDetector; namespace Nethermind.Merge.Plugin.Handlers; @@ -14,11 +16,22 @@ namespace Nethermind.Merge.Plugin.Handlers; /// public class GetPayloadV3Handler : GetPayloadHandlerBase { - public GetPayloadV3Handler(IPayloadPreparationService payloadPreparationService, ISpecProvider specProvider, ILogManager logManager) : base( + private readonly CensorshipDetector? _censorshipDetector; + public GetPayloadV3Handler( + IPayloadPreparationService payloadPreparationService, + ISpecProvider specProvider, + ILogManager logManager, + CensorshipDetector? censorshipDetector = null) : base( 3, payloadPreparationService, specProvider, logManager) { + _censorshipDetector = censorshipDetector; } - protected override GetPayloadV3Result GetPayloadResultFromBlock(IBlockProductionContext context) => - new(context.CurrentBestBlock!, context.BlockFees, new BlobsBundleV1(context.CurrentBestBlock!)); + protected override GetPayloadV3Result GetPayloadResultFromBlock(IBlockProductionContext context) + { + return new(context.CurrentBestBlock!, context.BlockFees, new BlobsBundleV1(context.CurrentBestBlock!)) + { + ShouldOverrideBuilder = _censorshipDetector?.GetCensoredBlocks().Contains(new BlockNumberHash(context.CurrentBestBlock!)) ?? false + }; + } } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs b/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs index 774eebf1d20..2d7ce1edee8 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs @@ -324,7 +324,7 @@ public Task InitRpcModules() IEngineRpcModule engineRpcModule = new EngineRpcModule( new GetPayloadV1Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), new GetPayloadV2Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), - new GetPayloadV3Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), + new GetPayloadV3Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager, _api.CensorshipDetector), new NewPayloadHandler( _api.BlockValidator, _api.BlockTree, diff --git a/src/Nethermind/Nethermind.Optimism/OptimismPlugin.cs b/src/Nethermind/Nethermind.Optimism/OptimismPlugin.cs index 9659aee08c2..c9df512ebe9 100644 --- a/src/Nethermind/Nethermind.Optimism/OptimismPlugin.cs +++ b/src/Nethermind/Nethermind.Optimism/OptimismPlugin.cs @@ -234,7 +234,7 @@ public async Task InitRpcModules() IEngineRpcModule engineRpcModule = new EngineRpcModule( new GetPayloadV1Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), new GetPayloadV2Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), - new GetPayloadV3Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager), + new GetPayloadV3Handler(payloadPreparationService, _api.SpecProvider, _api.LogManager, _api.CensorshipDetector), new NewPayloadHandler( _api.BlockValidator, _api.BlockTree, diff --git a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs index c1771f87338..98a32ce04e3 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs @@ -195,6 +195,14 @@ public EnhancedSortedSet GetFirsts() return sortedValues; } + /// + /// Returns best overall element as per supplied comparer order. + /// + public TValue? GetBest() + { + return GetFirsts().Min; + } + /// /// Gets last element in supplied comparer order. /// diff --git a/src/Nethermind/Nethermind.TxPool/ITxPool.cs b/src/Nethermind/Nethermind.TxPool/ITxPool.cs index 7fbb8102149..2c11c7515af 100644 --- a/src/Nethermind/Nethermind.TxPool/ITxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/ITxPool.cs @@ -38,6 +38,8 @@ public interface ITxPool bool ContainsTx(Hash256 hash, TxType txType); AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions); bool RemoveTransaction(Hash256? hash); + Transaction? GetBestTx(); + IEnumerable GetBestTxOfEachSender(); bool IsKnown(Hash256 hash); bool TryGetPendingTransaction(Hash256 hash, [NotNullWhen(true)] out Transaction? transaction); bool TryGetPendingBlobTransaction(Hash256 hash, [NotNullWhen(true)] out Transaction? blobTransaction); diff --git a/src/Nethermind/Nethermind.TxPool/NullTxPool.cs b/src/Nethermind/Nethermind.TxPool/NullTxPool.cs index 57d830e3b52..d0f1587522e 100644 --- a/src/Nethermind/Nethermind.TxPool/NullTxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/NullTxPool.cs @@ -38,6 +38,10 @@ public void RemovePeer(PublicKey nodeId) { } public bool RemoveTransaction(Hash256? hash) => false; + public Transaction? GetBestTx() => null; + + public IEnumerable GetBestTxOfEachSender() => Array.Empty(); + public bool IsKnown(Hash256 hash) => false; public bool TryGetPendingTransaction(Hash256 hash, [NotNullWhen(true)] out Transaction? transaction) diff --git a/src/Nethermind/Nethermind.TxPool/TxPool.cs b/src/Nethermind/Nethermind.TxPool/TxPool.cs index 05055748f69..65faf226e95 100644 --- a/src/Nethermind/Nethermind.TxPool/TxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/TxPool.cs @@ -736,6 +736,10 @@ public UInt256 GetLatestPendingNonce(Address address) return maxPendingNonce; } + public Transaction? GetBestTx() => _transactions.GetBest(); + + public IEnumerable GetBestTxOfEachSender() => _transactions.GetFirsts(); + public bool IsKnown(Hash256? hash) => hash is not null ? _hashCache.Get(hash) : false; public event EventHandler? NewDiscovered;