Skip to content

Commit

Permalink
Fix appendOrMergeRPC inefficiency in message size recalculation
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Oct 1, 2024
1 parent f71345c commit 7abd338
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
15 changes: 14 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,14 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo
gs.tracer.SendRPC(rpc, p)
}

// maxProtobufOverhead denotes the protobuf encoding overhead for a message.
// it is based on the RPC.Size function excerpt:
// l = e.Size()
// n += 1 + l + sovRpc(uint64(l))
// where sovRpc is a number of bytes needed to encode some uint64 value.
// Assuming that the message size is 10^10, the number of bytes needed to encode it is 5.
const maxProtobufOverhead = 1 + 5

// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
// If an RPC is too large and can't be split further (e.g. Message data is
Expand All @@ -1384,16 +1392,21 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {

for _, elem := range elems {
lastRPC := out[len(out)-1]
lastSize := lastRPC.Size()

// Merge/Append publish messages
// TODO: Never merge messages. The current behavior is the same as the
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = append(lastRPC.Publish, msg)
// do not use lastRPC.Size() here to avoid lastRPC.Publish iteration calling Size on each element.
lastSize += msg.Size() + maxProtobufOverhead
if lastSize > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
lastSize = lastRPC.Size() // single element calcualtion
out = append(out, lastRPC)
}
}
Expand Down
1 change: 1 addition & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2416,6 +2416,7 @@ func TestFragmentRPCFunction(t *testing.T) {
ensureBelowLimit(results)
msgsPerRPC := limit / msgSize
expectedRPCs := nMessages / msgsPerRPC
expectedRPCs += 1 // add one more message to account for message size approximation when fragmenting
if len(results) != expectedRPCs {
t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results))
}
Expand Down

0 comments on commit 7abd338

Please sign in to comment.