diff --git a/pkg/coordinator/clients/execution/block.go b/pkg/coordinator/clients/execution/block.go index 9612536..79f9b50 100644 --- a/pkg/coordinator/clients/execution/block.go +++ b/pkg/coordinator/clients/execution/block.go @@ -1,6 +1,7 @@ package execution import ( + "context" "sort" "sync" "time" @@ -45,10 +46,15 @@ func (block *Block) GetBlock() *types.Block { return block.block } -func (block *Block) AwaitBlock(timeout time.Duration) *types.Block { +func (block *Block) AwaitBlock(ctx context.Context, timeout time.Duration) *types.Block { + if ctx == nil { + ctx = context.Background() + } + select { case <-block.blockChan: case <-time.After(timeout): + case <-ctx.Done(): } return block.block diff --git a/pkg/coordinator/clients/execution/pool.go b/pkg/coordinator/clients/execution/pool.go index 2507349..37d3570 100644 --- a/pkg/coordinator/clients/execution/pool.go +++ b/pkg/coordinator/clients/execution/pool.go @@ -3,8 +3,6 @@ package execution import ( "fmt" "sync" - - "github.com/ethereum/go-ethereum/common" ) type SchedulerMode uint8 @@ -30,9 +28,6 @@ type Pool struct { schedulerMode SchedulerMode schedulerMutex sync.Mutex rrLastIndexes map[ClientType]uint16 - - walletsMutex sync.Mutex - walletsMap map[common.Address]*Wallet } func NewPool(config *PoolConfig) (*Pool, error) { @@ -41,7 +36,6 @@ func NewPool(config *PoolConfig) (*Pool, error) { clients: make([]*Client, 0), forkCache: map[int64][]*HeadFork{}, rrLastIndexes: map[ClientType]uint16{}, - walletsMap: map[common.Address]*Wallet{}, } var err error @@ -84,6 +78,13 @@ func (pool *Pool) GetAllEndpoints() []*Client { } func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client { + readyClients := pool.GetReadyEndpoints() + selectedClient := pool.runClientScheduler(readyClients, clientType) + + return selectedClient +} + +func (pool *Pool) GetReadyEndpoints() []*Client { canonicalFork := pool.GetCanonicalFork(-1) if canonicalFork == nil { return nil @@ -94,9 +95,7 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client { return nil } - selectedClient := pool.runClientScheduler(readyClients, clientType) - - return selectedClient + return readyClients } func (pool *Pool) IsClientReady(client *Client) bool { diff --git a/pkg/coordinator/clients/execution/wallet.go b/pkg/coordinator/clients/execution/wallet.go deleted file mode 100644 index 6b00fd4..0000000 --- a/pkg/coordinator/clients/execution/wallet.go +++ /dev/null @@ -1,255 +0,0 @@ -package execution - -import ( - "bytes" - "context" - "crypto/ecdsa" - "errors" - "fmt" - "math/big" - "sync" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/sirupsen/logrus" -) - -type Wallet struct { - pool *Pool - address common.Address - privkey *ecdsa.PrivateKey - isReady bool - readyChan chan bool - nonce uint64 - balance *big.Int - txMutex sync.Mutex - - nonceListener bool - nonceChans map[uint64]chan bool - nonceMutex sync.Mutex -} - -type TxStatus struct { -} - -func (pool *Pool) GetWalletByPrivkey(privkey *ecdsa.PrivateKey) (*Wallet, error) { - publicKey := privkey.Public() - publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) - - if !ok { - return nil, errors.New("error casting public key to ECDSA") - } - - address := crypto.PubkeyToAddress(*publicKeyECDSA) - wallet := pool.GetWalletByAddress(address) - - if wallet.privkey == nil { - wallet.privkey = privkey - } - - return wallet, nil -} - -func (pool *Pool) GetWalletByAddress(address common.Address) *Wallet { - pool.walletsMutex.Lock() - defer pool.walletsMutex.Unlock() - - wallet := pool.walletsMap[address] - if wallet == nil { - wallet = newWallet(pool, address) - pool.walletsMap[address] = wallet - } - - return wallet -} - -func newWallet(pool *Pool, address common.Address) *Wallet { - wallet := &Wallet{ - pool: pool, - address: address, - nonceChans: map[uint64]chan bool{}, - } - wallet.loadState() - - return wallet -} - -func (wallet *Wallet) loadState() { - wallet.readyChan = make(chan bool) - - go func() { - for { - client := wallet.pool.GetReadyEndpoint(UnspecifiedClient) - if client == nil { - time.Sleep(500 * time.Millisecond) - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - - nonce, err := client.GetRPCClient().GetNonceAt(ctx, wallet.address, nil) - if err != nil { - logrus.WithError(err).Warnf("could not get last noce for wallet %v", wallet.address.String()) - cancel() - - continue - } - - balance, err := client.GetRPCClient().GetBalanceAt(ctx, wallet.address, nil) - if err != nil { - logrus.WithError(err).Warnf("could not get balance for wallet %v", wallet.address.String()) - cancel() - - continue - } - - wallet.nonce = nonce - wallet.balance = balance - wallet.isReady = true - close(wallet.readyChan) - cancel() - - break - } - }() -} - -func (wallet *Wallet) GetAddress() common.Address { - return wallet.address -} - -func (wallet *Wallet) AwaitReady(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-wallet.readyChan: - } - - return nil -} - -func (wallet *Wallet) BuildTransaction(ctx context.Context, buildFn func(ctx context.Context, nonce uint64, signer bind.SignerFn) (*types.Transaction, error)) (*types.Transaction, error) { - err := wallet.AwaitReady(ctx) - if err != nil { - return nil, err - } - - wallet.txMutex.Lock() - defer wallet.txMutex.Unlock() - - signer := types.LatestSignerForChainID(wallet.pool.blockCache.GetChainID()) - nonce := wallet.nonce - tx, err := buildFn(ctx, nonce, func(addr common.Address, tx *types.Transaction) (*types.Transaction, error) { - if !bytes.Equal(addr[:], wallet.address[:]) { - return nil, fmt.Errorf("cannot sign for another wallet") - } - - signedTx, serr := types.SignTx(tx, signer, wallet.privkey) - if serr != nil { - return nil, serr - } - - return signedTx, nil - }) - - if err != nil { - return nil, err - } - - signedTx, err := types.SignTx(tx, signer, wallet.privkey) - - if err != nil { - return nil, err - } - - wallet.nonce++ - - return signedTx, nil -} - -func (wallet *Wallet) AwaitTransaction(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { - nonceChan := wallet.getNonceIncreaseChan(tx.Nonce() + 1) - if nonceChan != nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-nonceChan: - } - } - - client := wallet.pool.GetCanonicalFork(0).ReadyClients[0] - - return client.GetRPCClient().GetTransactionReceipt(ctx, tx.Hash()) -} - -func (wallet *Wallet) getNonceIncreaseChan(targetNonce uint64) chan bool { - wallet.nonceMutex.Lock() - defer wallet.nonceMutex.Unlock() - - nonceChan := wallet.nonceChans[targetNonce] - if nonceChan != nil { - return nonceChan - } - - nonceChan = make(chan bool) - wallet.nonceChans[targetNonce] = nonceChan - - if !wallet.nonceListener { - wallet.nonceListener = true - - go wallet.runNonceIncreaseLoop() - } - - return nonceChan -} - -func (wallet *Wallet) runNonceIncreaseLoop() { - <-wallet.readyChan - - blockSubscription := wallet.pool.blockCache.blockDispatcher.Subscribe(10) - defer blockSubscription.Unsubscribe() - - lastNonce := uint64(0) - awaitNext := true - - for awaitNext { - block := <-blockSubscription.Channel() - client := block.GetSeenBy()[0] - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - nonce, err := client.GetRPCClient().GetNonceAt(ctx, wallet.address, nil) - - cancel() - - if err != nil { - logrus.WithError(err).Warnf("could not get last noce for wallet %v", wallet.address.String()) - continue - } - - if nonce == lastNonce { - continue - } - - wallet.nonceMutex.Lock() - awaitNext = false - lastNonce = nonce - - for n, c := range wallet.nonceChans { - if n <= nonce { - close(c) - delete(wallet.nonceChans, n) - } else { - awaitNext = true - } - } - - if !awaitNext { - wallet.nonceListener = false - } - - wallet.nonceMutex.Unlock() - } -} diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index ba68a6c..919f147 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -12,6 +12,7 @@ import ( "github.com/ethpandaops/assertoor/pkg/coordinator/test" "github.com/ethpandaops/assertoor/pkg/coordinator/types" "github.com/ethpandaops/assertoor/pkg/coordinator/vars" + "github.com/ethpandaops/assertoor/pkg/coordinator/wallet" "github.com/ethpandaops/assertoor/pkg/coordinator/web/server" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -22,6 +23,7 @@ type Coordinator struct { Config *Config log logrus.FieldLogger clientPool *clients.ClientPool + walletManager *wallet.Manager webserver *server.WebServer validatorNames *names.ValidatorNames tests []types.Test @@ -54,6 +56,7 @@ func (c *Coordinator) Run(ctx context.Context) error { } c.clientPool = clientPool + c.walletManager = wallet.NewManager(clientPool.GetExecutionPool(), c.log.WithField("module", "wallet")) for idx := range c.Config.Endpoints { err = clientPool.AddClient(&c.Config.Endpoints[idx]) @@ -128,6 +131,10 @@ func (c *Coordinator) ClientPool() *clients.ClientPool { return c.clientPool } +func (c *Coordinator) WalletManager() *wallet.Manager { + return c.walletManager +} + func (c *Coordinator) ValidatorNames() *names.ValidatorNames { return c.validatorNames } diff --git a/pkg/coordinator/tasks/generate_deposits/task.go b/pkg/coordinator/tasks/generate_deposits/task.go index 5e13ada..498ac2a 100644 --- a/pkg/coordinator/tasks/generate_deposits/task.go +++ b/pkg/coordinator/tasks/generate_deposits/task.go @@ -271,7 +271,7 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, validator return fmt.Errorf("cannot create bound instance of DepositContract: %w", err) } - wallet, err := clientPool.GetExecutionPool().GetWalletByPrivkey(t.walletPrivKey) + wallet, err := t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(t.walletPrivKey) if err != nil { return fmt.Errorf("cannot initialize wallet: %w", err) } diff --git a/pkg/coordinator/tasks/generate_eoa_transactions/config.go b/pkg/coordinator/tasks/generate_eoa_transactions/config.go new file mode 100644 index 0000000..6e0e9b0 --- /dev/null +++ b/pkg/coordinator/tasks/generate_eoa_transactions/config.go @@ -0,0 +1,60 @@ +package generateeoatransactions + +import ( + "errors" + "math/big" +) + +type Config struct { + LimitPerBlock int `yaml:"limitPerBlock" json:"limitPerBlock"` + LimitTotal int `yaml:"limitTotal" json:"limitTotal"` + LimitPending int `yaml:"limitPending" json:"limitPending"` + PrivateKey string `yaml:"privateKey" json:"privateKey"` + ChildWallets uint64 `yaml:"childWallets" json:"childWallets"` + WalletSeed string `yaml:"walletSeed" json:"walletSeed"` + + RefillPendingLimit uint64 `yaml:"refillPendingLimit" json:"refillPendingLimit"` + RefillFeeCap *big.Int `yaml:"refillFeeCap" json:"refillFeeCap"` + RefillTipCap *big.Int `yaml:"refillTipCap" json:"refillTipCap"` + RefillAmount *big.Int `yaml:"refillAmount" json:"refillAmount"` + RefillMinBalance *big.Int `yaml:"refillMinBalance" json:"refillMinBalance"` + + LegacyTransactions bool `yaml:"legacyTransactions" json:"legacyTransactions"` + TransactionFeeCap *big.Int `yaml:"transactionFeeCap" json:"transactionFeeCap"` + TransactionTipCap *big.Int `yaml:"transactionTipCap" json:"transactionTipCap"` + TransactionGasLimit uint64 `yaml:"transactionGasLimit" json:"transactionGasLimit"` + TargetAddress string `yaml:"targetAddress" json:"targetAddress"` + RandomTarget bool `yaml:"randomTarget" json:"randomTarget"` + ContractDeployment bool `yaml:"contractDeployment" json:"contractDeployment"` + TransactionData string `yaml:"transactionData" json:"transactionData"` + RandomAmount bool `yaml:"randomAmount" json:"randomAmount"` + TransactionAmount *big.Int `yaml:"transactionAmount" json:"transactionAmount"` + + ClientPattern string `yaml:"clientPattern" json:"clientPattern"` +} + +func DefaultConfig() Config { + return Config{ + RefillPendingLimit: 200, + RefillFeeCap: big.NewInt(500000000000), // 500 Gwei + RefillTipCap: big.NewInt(1000000000), // 1 Gwei + RefillAmount: big.NewInt(1000000000000000000), // 1 ETH + RefillMinBalance: big.NewInt(500000000000000000), // 0.5 ETH + TransactionFeeCap: big.NewInt(100000000000), // 100 Gwei + TransactionTipCap: big.NewInt(1000000000), // 1 Gwei + TransactionGasLimit: 50000, + TransactionAmount: big.NewInt(0), + } +} + +func (c *Config) Validate() error { + if c.LimitPerBlock == 0 && c.LimitTotal == 0 && c.LimitPending == 0 { + return errors.New("either limitPerBlock or limitTotal or limitPending must be set") + } + + if c.PrivateKey == "" { + return errors.New("privateKey must be set") + } + + return nil +} diff --git a/pkg/coordinator/tasks/generate_eoa_transactions/task.go b/pkg/coordinator/tasks/generate_eoa_transactions/task.go new file mode 100644 index 0000000..632a6cd --- /dev/null +++ b/pkg/coordinator/tasks/generate_eoa_transactions/task.go @@ -0,0 +1,341 @@ +package generateeoatransactions + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/execution" + "github.com/ethpandaops/assertoor/pkg/coordinator/types" + "github.com/ethpandaops/assertoor/pkg/coordinator/wallet" + "github.com/sirupsen/logrus" +) + +var ( + TaskName = "generate_eoa_transactions" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Generates normal eoa transactions and sends them to the network", + Config: DefaultConfig(), + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger + txIndex uint64 + wallet *wallet.Wallet + walletPool *wallet.WalletPool + + targetAddr common.Address + transactionData []byte +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + }, nil +} + +func (t *Task) Name() string { + return TaskName +} + +func (t *Task) Description() string { + return TaskDescriptor.Description +} + +func (t *Task) Title() string { + return t.ctx.Vars.ResolvePlaceholders(t.options.Title) +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Logger() logrus.FieldLogger { + return t.logger +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + // parse static config + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + // load dynamic vars + err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars) + if err != nil { + return err + } + + // validate config + if valerr := config.Validate(); valerr != nil { + return valerr + } + + // load wallets + privKey, err := crypto.HexToECDSA(config.PrivateKey) + if err != nil { + return err + } + + if config.ChildWallets == 0 { + t.wallet, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(privKey) + if err != nil { + return fmt.Errorf("cannot initialize wallet: %w", err) + } + } else { + t.walletPool, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletPoolByPrivkey(privKey, config.ChildWallets, config.WalletSeed) + if err != nil { + return fmt.Errorf("cannot initialize wallet pool: %w", err) + } + } + + // parse target addr + if config.TargetAddress != "" { + err = t.targetAddr.UnmarshalText([]byte(config.TargetAddress)) + if err != nil { + return fmt.Errorf("cannot decode execution addr: %w", err) + } + } + + // parse transaction data + if config.TransactionData != "" { + t.transactionData = common.FromHex(config.TransactionData) + } + + t.config = config + + return nil +} + +func (t *Task) Execute(ctx context.Context) error { + if t.walletPool != nil { + err := t.ensureChildWalletFunding(ctx) + if err != nil { + t.logger.Infof("failed ensuring child wallet funding: %v", err) + return err + } + + for idx, wallet := range t.walletPool.GetChildWallets() { + t.logger.Infof("wallet #%v: %v [nonce: %v]", idx, wallet.GetAddress().Hex(), wallet.GetNonce()) + } + + go t.runChildWalletFundingRoutine(ctx) + } + + var subscription *execution.Subscription[*execution.Block] + if t.config.LimitPerBlock > 0 { + subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().SubscribeBlockEvent(10) + defer subscription.Unsubscribe() + } + + var pendingChan chan bool + + if t.config.LimitPending > 0 { + pendingChan = make(chan bool, t.config.LimitPending) + } + + perBlockCount := 0 + totalCount := 0 + + for { + if pendingChan != nil { + select { + case <-ctx.Done(): + return nil + case pendingChan <- true: + } + } + + txIndex := t.txIndex + t.txIndex++ + + err := t.generateTransaction(ctx, txIndex, func(tx *ethtypes.Transaction, receipt *ethtypes.Receipt) { + if pendingChan != nil { + <-pendingChan + } + + if receipt != nil { + t.logger.Infof("transaction %v confirmed (nonce: %v, status: %v)", tx.Hash().Hex(), tx.Nonce(), receipt.Status) + } else { + t.logger.Infof("transaction %v replaced (nonce: %v)", tx.Hash().Hex(), tx.Nonce()) + } + }) + if err != nil { + t.logger.Errorf("error generating transaction: %v", err.Error()) + } else { + perBlockCount++ + totalCount++ + } + + if t.config.LimitTotal > 0 && totalCount >= t.config.LimitTotal { + break + } + + if t.config.LimitPerBlock > 0 && perBlockCount >= t.config.LimitPerBlock { + // await next block + perBlockCount = 0 + select { + case <-ctx.Done(): + return nil + case <-subscription.Channel(): + } + } else if err := ctx.Err(); err != nil { + return err + } + } + + return nil +} + +func (t *Task) runChildWalletFundingRoutine(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Minute): + err := t.ensureChildWalletFunding(ctx) + if err != nil { + t.logger.Infof("failed ensuring child wallet funding: %v", err) + } + } + } +} + +func (t *Task) ensureChildWalletFunding(ctx context.Context) error { + t.logger.Infof("ensure child wallet funding") + + err := t.walletPool.EnsureFunding(ctx, t.config.RefillMinBalance, t.config.RefillAmount, t.config.RefillFeeCap, t.config.RefillTipCap, t.config.RefillPendingLimit) + if err != nil { + return err + } + + return nil +} + +func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, confirmedFn func(tx *ethtypes.Transaction, receipt *ethtypes.Receipt)) error { + txWallet := t.wallet + if t.wallet == nil { + txWallet = t.walletPool.GetNextChildWallet() + } + + tx, err := txWallet.BuildTransaction(ctx, func(ctx context.Context, nonce uint64, signer bind.SignerFn) (*ethtypes.Transaction, error) { + var toAddr *common.Address + + if !t.config.ContractDeployment { + addr := txWallet.GetAddress() + if t.config.RandomTarget { + addrBytes := make([]byte, 20) + //nolint:errcheck // ignore + rand.Read(addrBytes) + addr = common.Address(addrBytes) + } else if t.config.TargetAddress != "" { + addr = t.targetAddr + } + + toAddr = &addr + } + + txAmount := new(big.Int).Set(t.config.TransactionAmount) + if t.config.RandomAmount { + n, err := rand.Int(rand.Reader, txAmount) + if err == nil { + txAmount = n + } + } + + txData := []byte{} + if t.transactionData != nil { + txData = t.transactionData + } + + var txObj ethtypes.TxData + + if t.config.LegacyTransactions { + txObj = ðtypes.LegacyTx{ + Nonce: nonce, + GasPrice: t.config.TransactionFeeCap, + Gas: t.config.TransactionGasLimit, + To: toAddr, + Value: txAmount, + Data: txData, + } + } else { + txObj = ðtypes.DynamicFeeTx{ + ChainID: t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), + Nonce: nonce, + GasTipCap: t.config.TransactionTipCap, + GasFeeCap: t.config.TransactionFeeCap, + Gas: t.config.TransactionGasLimit, + To: toAddr, + Value: txAmount, + Data: txData, + } + } + return ethtypes.NewTx(txObj), nil + }) + if err != nil { + return err + } + + var clients []*execution.Client + + clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + + if t.config.ClientPattern == "" { + clients = clientPool.GetExecutionPool().GetReadyEndpoints() + } else { + poolClients := clientPool.GetClientsByNamePatterns([]string{t.config.ClientPattern}) + if len(poolClients) == 0 { + return fmt.Errorf("no client found with pattern %v", t.config.ClientPattern) + } + + clients = make([]*execution.Client, len(poolClients)) + for i, c := range poolClients { + clients[i] = c.ExecutionClient + } + } + + client := clients[transactionIdx%uint64(len(clients))] + + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) + + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err != nil { + return err + } + + go func() { + receipt, err := txWallet.AwaitTransaction(ctx, tx) + if err != nil { + t.logger.Warnf("failed waiting for tx receipt: %v", err) + } + + confirmedFn(tx, receipt) + }() + + return nil +} diff --git a/pkg/coordinator/tasks/tasks.go b/pkg/coordinator/tasks/tasks.go index 5b78da3..5b47732 100644 --- a/pkg/coordinator/tasks/tasks.go +++ b/pkg/coordinator/tasks/tasks.go @@ -14,6 +14,7 @@ import ( checkexecutionsyncstatus "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_execution_sync_status" generateblschanges "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_bls_changes" generatedeposits "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_deposits" + generateeoatransactions "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_eoa_transactions" generateexits "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_exits" generateslashings "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_slashings" runcommand "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/run_command" @@ -36,6 +37,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ checkconsensusvalidatorstatus.TaskDescriptor, checkexecutionsyncstatus.TaskDescriptor, generateblschanges.TaskDescriptor, + generateeoatransactions.TaskDescriptor, generatedeposits.TaskDescriptor, generateexits.TaskDescriptor, generateslashings.TaskDescriptor, diff --git a/pkg/coordinator/types/coordinator.go b/pkg/coordinator/types/coordinator.go index 8426643..0cb92b8 100644 --- a/pkg/coordinator/types/coordinator.go +++ b/pkg/coordinator/types/coordinator.go @@ -3,12 +3,14 @@ package types import ( "github.com/ethpandaops/assertoor/pkg/coordinator/clients" "github.com/ethpandaops/assertoor/pkg/coordinator/names" + "github.com/ethpandaops/assertoor/pkg/coordinator/wallet" "github.com/sirupsen/logrus" ) type Coordinator interface { Logger() logrus.FieldLogger ClientPool() *clients.ClientPool + WalletManager() *wallet.Manager ValidatorNames() *names.ValidatorNames NewVariables(parentScope Variables) Variables GetTests() []Test diff --git a/pkg/coordinator/wallet/manager.go b/pkg/coordinator/wallet/manager.go new file mode 100644 index 0000000..6c3fc76 --- /dev/null +++ b/pkg/coordinator/wallet/manager.go @@ -0,0 +1,113 @@ +package wallet + +import ( + "context" + "crypto/ecdsa" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/execution" + "github.com/sirupsen/logrus" +) + +type Manager struct { + clientPool *execution.Pool + logger logrus.FieldLogger + + walletsMutex sync.Mutex + walletsMap map[common.Address]*Wallet +} + +func NewManager(clientPool *execution.Pool, logger logrus.FieldLogger) *Manager { + manager := &Manager{ + clientPool: clientPool, + logger: logger, + walletsMap: map[common.Address]*Wallet{}, + } + + go manager.runBlockTransactionsLoop() + + return manager +} + +func (manager *Manager) GetWalletByPrivkey(privkey *ecdsa.PrivateKey) (*Wallet, error) { + publicKey := privkey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + + if !ok { + return nil, errors.New("error casting public key to ECDSA") + } + + address := crypto.PubkeyToAddress(*publicKeyECDSA) + wallet := manager.GetWalletByAddress(address) + + if wallet.privkey == nil { + wallet.privkey = privkey + } + + return wallet, nil +} + +func (manager *Manager) GetWalletByAddress(address common.Address) *Wallet { + manager.walletsMutex.Lock() + defer manager.walletsMutex.Unlock() + + wallet := manager.walletsMap[address] + if wallet == nil { + wallet = manager.newWallet(address) + manager.walletsMap[address] = wallet + } + + return wallet +} + +func (manager *Manager) runBlockTransactionsLoop() { + blockSubscription := manager.clientPool.GetBlockCache().SubscribeBlockEvent(10) + defer blockSubscription.Unsubscribe() + + for block := range blockSubscription.Channel() { + manager.processBlockTransactions(block) + } +} + +func (manager *Manager) processBlockTransactions(block *execution.Block) { + blockData := block.AwaitBlock(context.Background(), 2*time.Second) + if blockData == nil { + return + } + + manager.walletsMutex.Lock() + + wallets := map[common.Address]*Wallet{} + for addr := range manager.walletsMap { + wallets[addr] = manager.walletsMap[addr] + } + + manager.walletsMutex.Unlock() + + signer := ethtypes.LatestSignerForChainID(manager.clientPool.GetBlockCache().GetChainID()) + for idx, tx := range blockData.Transactions() { + txFrom, err := ethtypes.Sender(signer, tx) + if err != nil { + manager.logger.Warnf("error decoding ts sender (block %v, tx %v): %v", block.Number, idx, err) + continue + } + + fromWallet := wallets[txFrom] + if fromWallet != nil { + fromWallet.processTransactionInclusion(block, tx) + } + + toAddr := tx.To() + if toAddr != nil { + toWallet := wallets[*toAddr] + if toWallet != nil { + toWallet.processTransactionReceival(block, tx) + } + } + } +} diff --git a/pkg/coordinator/wallet/wallet.go b/pkg/coordinator/wallet/wallet.go new file mode 100644 index 0000000..3b271d8 --- /dev/null +++ b/pkg/coordinator/wallet/wallet.go @@ -0,0 +1,287 @@ +package wallet + +import ( + "bytes" + "context" + "crypto/ecdsa" + "fmt" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/execution" + "github.com/sirupsen/logrus" +) + +type Wallet struct { + manager *Manager + + address common.Address + privkey *ecdsa.PrivateKey + isReady bool + readyChan chan bool + + txBuildMutex sync.Mutex + pendingNonce uint64 + pendingBalance *big.Int + + confirmedNonce uint64 + confirmedBalance *big.Int + + txNonceChans map[uint64]*nonceStatus + txNonceMutex sync.Mutex +} + +type nonceStatus struct { + receipt *types.Receipt + channel chan bool +} + +func (manager *Manager) newWallet(address common.Address) *Wallet { + wallet := &Wallet{ + manager: manager, + address: address, + txNonceChans: map[uint64]*nonceStatus{}, + } + wallet.loadState() + + return wallet +} + +func (wallet *Wallet) loadState() { + wallet.readyChan = make(chan bool) + + go func() { + for { + client := wallet.manager.clientPool.GetReadyEndpoint(execution.UnspecifiedClient) + if client == nil { + time.Sleep(500 * time.Millisecond) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + + nonce, err := client.GetRPCClient().GetNonceAt(ctx, wallet.address, nil) + if err != nil { + logrus.WithError(err).Warnf("could not get last noce for wallet %v", wallet.address.String()) + cancel() + + continue + } + + balance, err := client.GetRPCClient().GetBalanceAt(ctx, wallet.address, nil) + if err != nil { + logrus.WithError(err).Warnf("could not get balance for wallet %v", wallet.address.String()) + cancel() + + continue + } + + wallet.pendingNonce = nonce + wallet.confirmedNonce = nonce + wallet.pendingBalance = new(big.Int).Set(balance) + wallet.confirmedBalance = new(big.Int).Set(balance) + wallet.isReady = true + close(wallet.readyChan) + cancel() + + break + } + }() +} + +func (wallet *Wallet) GetAddress() common.Address { + return wallet.address +} + +func (wallet *Wallet) GetPrivateKey() *ecdsa.PrivateKey { + return wallet.privkey +} + +func (wallet *Wallet) GetBalance() *big.Int { + return wallet.confirmedBalance +} + +func (wallet *Wallet) GetPendingBalance() *big.Int { + return wallet.pendingBalance +} + +func (wallet *Wallet) GetNonce() uint64 { + return wallet.confirmedNonce +} + +func (wallet *Wallet) AwaitReady(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-wallet.readyChan: + } + + return nil +} + +func (wallet *Wallet) BuildTransaction(ctx context.Context, buildFn func(ctx context.Context, nonce uint64, signer bind.SignerFn) (*types.Transaction, error)) (*types.Transaction, error) { + err := wallet.AwaitReady(ctx) + if err != nil { + return nil, err + } + + wallet.txBuildMutex.Lock() + defer wallet.txBuildMutex.Unlock() + + signer := types.LatestSignerForChainID(wallet.manager.clientPool.GetBlockCache().GetChainID()) + nonce := wallet.pendingNonce + tx, err := buildFn(ctx, nonce, func(addr common.Address, tx *types.Transaction) (*types.Transaction, error) { + if !bytes.Equal(addr[:], wallet.address[:]) { + return nil, fmt.Errorf("cannot sign for another wallet") + } + + signedTx, serr := types.SignTx(tx, signer, wallet.privkey) + if serr != nil { + return nil, serr + } + + return signedTx, nil + }) + + if err != nil { + return nil, err + } + + signedTx, err := types.SignTx(tx, signer, wallet.privkey) + + if err != nil { + return nil, err + } + + wallet.pendingNonce++ + + return signedTx, nil +} + +func (wallet *Wallet) AwaitTransaction(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { + err := wallet.AwaitReady(ctx) + if err != nil { + return nil, err + } + + txHash := tx.Hash() + nonceChan := wallet.getTxNonceChan(tx.Nonce()) + + if nonceChan != nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-nonceChan.channel: + } + + receipt := nonceChan.receipt + if receipt != nil { + if bytes.Equal(receipt.TxHash[:], txHash[:]) { + return receipt, nil + } + + return nil, nil + } + } + + client := wallet.manager.clientPool.GetCanonicalFork(0).ReadyClients[0] + + return client.GetRPCClient().GetTransactionReceipt(ctx, txHash) +} + +func (wallet *Wallet) getTxNonceChan(targetNonce uint64) *nonceStatus { + wallet.txNonceMutex.Lock() + defer wallet.txNonceMutex.Unlock() + + nonceChan := wallet.txNonceChans[targetNonce] + if nonceChan != nil { + return nonceChan + } + + nonceChan = &nonceStatus{ + channel: make(chan bool), + } + wallet.txNonceChans[targetNonce] = nonceChan + + return nonceChan +} + +func (wallet *Wallet) processTransactionInclusion(block *execution.Block, tx *types.Transaction) { + if !wallet.isReady { + return + } + + receipt := wallet.loadTransactionReceipt(block, tx) + nonce := tx.Nonce() + 1 + + wallet.txNonceMutex.Lock() + defer wallet.txNonceMutex.Unlock() + + if wallet.confirmedNonce >= nonce { + return + } + + if receipt != nil { + wallet.confirmedBalance = wallet.confirmedBalance.Sub(wallet.confirmedBalance, tx.Value()) + txFee := new(big.Int).Mul(receipt.EffectiveGasPrice, big.NewInt(int64(receipt.GasUsed))) + wallet.confirmedBalance = wallet.confirmedBalance.Sub(wallet.confirmedBalance, txFee) + wallet.pendingBalance = wallet.pendingBalance.Sub(wallet.pendingBalance, txFee) + } + + for n := range wallet.txNonceChans { + if n == nonce-1 { + wallet.txNonceChans[n].receipt = receipt + } + + if n < nonce { + close(wallet.txNonceChans[n].channel) + delete(wallet.txNonceChans, n) + } + } + + wallet.confirmedNonce = nonce + if wallet.confirmedNonce > wallet.pendingNonce { + wallet.pendingNonce = wallet.confirmedNonce + wallet.pendingBalance = new(big.Int).Set(wallet.confirmedBalance) + } +} + +func (wallet *Wallet) processTransactionReceival(_ *execution.Block, tx *types.Transaction) { + if !wallet.isReady { + return + } + + wallet.pendingBalance = wallet.pendingBalance.Add(wallet.pendingBalance, tx.Value()) + wallet.confirmedBalance = wallet.confirmedBalance.Add(wallet.confirmedBalance, tx.Value()) +} + +func (wallet *Wallet) loadTransactionReceipt(block *execution.Block, tx *types.Transaction) *types.Receipt { + retryCount := uint64(0) + + for { + clients := block.GetSeenBy() + cliIdx := retryCount % uint64(len(clients)) + client := clients[cliIdx] + + receipt, err := client.GetRPCClient().GetTransactionReceipt(context.Background(), tx.Hash()) + if err == nil { + return receipt + } + + wallet.manager.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + "txhash": tx.Hash(), + }).Warnf("could not load tx receipt: %v", err) + + if retryCount < 5 { + time.Sleep(1 * time.Second) + retryCount++ + } else { + return nil + } + } +} diff --git a/pkg/coordinator/wallet/walletpool.go b/pkg/coordinator/wallet/walletpool.go new file mode 100644 index 0000000..69d7aa4 --- /dev/null +++ b/pkg/coordinator/wallet/walletpool.go @@ -0,0 +1,186 @@ +package wallet + +import ( + "context" + "crypto/ecdsa" + "crypto/sha256" + "encoding/binary" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/sirupsen/logrus" +) + +//nolint:revive // ignore +type WalletPool struct { + manager *Manager + logger logrus.FieldLogger + rootWallet *Wallet + wallets []*Wallet + nextIdx uint64 +} + +func (manager *Manager) GetWalletPoolByPrivkey(privkey *ecdsa.PrivateKey, walletCount uint64, childSeed string) (*WalletPool, error) { + rootWallet, err := manager.GetWalletByPrivkey(privkey) + if err != nil { + return nil, err + } + + pool := &WalletPool{ + manager: manager, + logger: manager.logger, + rootWallet: rootWallet, + wallets: make([]*Wallet, walletCount), + } + + for i := uint64(0); i < walletCount; i++ { + wallet, err := pool.newChildWallet(i, childSeed) + if err != nil { + return nil, err + } + + pool.wallets[i] = wallet + } + + return pool, nil +} + +func (pool *WalletPool) newChildWallet(childIdx uint64, childSeed string) (*Wallet, error) { + idxBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idxBytes, childIdx) + + if childSeed != "" { + seedBytes := []byte(childSeed) + idxBytes = append(idxBytes, seedBytes...) + } + + childKeyBytes := sha256.Sum256(append(crypto.FromECDSA(pool.rootWallet.privkey), idxBytes...)) + + childKey, err := crypto.ToECDSA(childKeyBytes[:]) + if err != nil { + return nil, err + } + + return pool.manager.GetWalletByPrivkey(childKey) +} + +func (pool *WalletPool) GetRootWallet() *Wallet { + return pool.rootWallet +} + +func (pool *WalletPool) GetChildWallets() []*Wallet { + return pool.wallets +} + +func (pool *WalletPool) GetChildWallet(index uint64) *Wallet { + if index >= uint64(len(pool.wallets)) { + return nil + } + + return pool.wallets[index] +} + +func (pool *WalletPool) GetNextChildWallet() *Wallet { + wallet := pool.wallets[pool.nextIdx] + pool.nextIdx++ + + if pool.nextIdx >= uint64(len(pool.wallets)) { + pool.nextIdx = 0 + } + + return wallet +} + +func (pool *WalletPool) EnsureFunding(ctx context.Context, minBalance, refillAmount, gasFeeCap, gasTipCap *big.Int, pendingLimit uint64) error { + refillTxs := []*types.Transaction{} + + var refillError error + + for _, wallet := range pool.wallets { + err := wallet.AwaitReady(ctx) + if err != nil { + return err + } + + if wallet.GetPendingBalance().Cmp(minBalance) >= 1 { + continue + } + + tx, err := pool.rootWallet.BuildTransaction(ctx, func(ctx context.Context, nonce uint64, signer bind.SignerFn) (*types.Transaction, error) { + toAddr := wallet.GetAddress() + txData := &types.DynamicFeeTx{ + ChainID: pool.manager.clientPool.GetBlockCache().GetChainID(), + Nonce: nonce, + GasTipCap: gasTipCap, + GasFeeCap: gasFeeCap, + Gas: 50000, + To: &toAddr, + Value: refillAmount, + } + return types.NewTx(txData), nil + }) + if err != nil { + pool.logger.Warnf("failed creating child wallet refill tx: %v", err) + refillError = err + + continue + } + + refillTxs = append(refillTxs, tx) + } + + refillTxCount := uint64(len(refillTxs)) + if refillTxCount == 0 { + if refillError != nil { + return refillError + } + + return nil + } + + sentIdx := uint64(0) + headFork := pool.manager.clientPool.GetCanonicalFork(0) + client := headFork.ReadyClients[0] + txChan := make(chan bool, pendingLimit) + txWg := sync.WaitGroup{} + + for sentIdx < refillTxCount { + select { + case txChan <- true: + case <-ctx.Done(): + return ctx.Err() + } + + tx := refillTxs[sentIdx] + sentIdx++ + + err := client.GetRPCClient().SendTransaction(ctx, tx) + if err != nil { + pool.logger.Warnf("failed sensing child wallet refill tx: %v", err) + refillError = err + + break + } + + txWg.Add(1) + + go func(tx *types.Transaction) { + _, err := pool.rootWallet.AwaitTransaction(ctx, tx) + + if err != nil { + pool.logger.Warnf("failed awaiting child wallet refill tx: %v", err) + refillError = err + } + + <-txChan + txWg.Done() + }(tx) + } + + txWg.Wait() + + return refillError +}