From b96e46014aa0cdcf2b20cd90f1f72f227fb7abaf Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 15 Jan 2024 14:10:44 +0100 Subject: [PATCH] made `generate_blob_transactions` & `generate_Transaction` tasks more resilent against RPC errors --- .../tasks/generate_blob_transactions/task.go | 34 ++++++++++++------- .../tasks/generate_transaction/task.go | 18 +++++++--- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/pkg/coordinator/tasks/generate_blob_transactions/task.go b/pkg/coordinator/tasks/generate_blob_transactions/task.go index ec9ca13..6fd0bbf 100644 --- a/pkg/coordinator/tasks/generate_blob_transactions/task.go +++ b/pkg/coordinator/tasks/generate_blob_transactions/task.go @@ -162,14 +162,6 @@ func (t *Task) Execute(ctx context.Context) error { totalCount := 0 for { - if pendingChan != nil { - select { - case <-ctx.Done(): - return nil - case pendingChan <- true: - } - } - txIndex := t.txIndex t.txIndex++ @@ -187,6 +179,14 @@ func (t *Task) Execute(ctx context.Context) error { if err != nil { t.logger.Errorf("error generating transaction: %v", err.Error()) } else { + if pendingChan != nil { + select { + case <-ctx.Done(): + return nil + case pendingChan <- true: + } + } + perBlockCount++ totalCount++ } @@ -319,13 +319,21 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c } } - client := clients[transactionIdx%uint64(len(clients))] + err = nil + + for i := 0; i < len(clients); i++ { + client := clients[(transactionIdx+uint64(i))%uint64(len(clients))] + + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err == nil { + break + } + } - err = client.GetRPCClient().SendTransaction(ctx, tx) if err != nil { return err } diff --git a/pkg/coordinator/tasks/generate_transaction/task.go b/pkg/coordinator/tasks/generate_transaction/task.go index 3941367..24d226f 100644 --- a/pkg/coordinator/tasks/generate_transaction/task.go +++ b/pkg/coordinator/tasks/generate_transaction/task.go @@ -145,13 +145,21 @@ func (t *Task) Execute(ctx context.Context) error { } } - client := clients[0] + err = nil - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Infof("sending tx: %v", tx.Hash().Hex()) + for i := 0; i < len(clients); i++ { + client := clients[i%len(clients)] + + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx: %v", tx.Hash().Hex()) + + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err == nil { + break + } + } - err = client.GetRPCClient().SendTransaction(ctx, tx) if err != nil { return err }