Skip to content

eth: check blob transaction validity on the peer goroutine when received #31219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/devp2p/internal/ethtest/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error {
return err
}

var errDisc error = fmt.Errorf("disconnect")

// ReadEth reads an Eth sub-protocol wire message.
func (c *Conn) ReadEth() (any, error) {
c.SetReadDeadline(time.Now().Add(timeout))
for {
code, data, _, err := c.Conn.Read()
if code == discMsg {
return nil, errDisc
}
if err != nil {
return nil, err
}
Expand Down
197 changes: 197 additions & 0 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package ethtest

import (
"context"
"crypto/rand"
"fmt"
"reflect"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
Expand Down Expand Up @@ -79,6 +83,8 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "InvalidTxs", Fn: s.TestInvalidTxs},
{Name: "NewPooledTxs", Fn: s.TestNewPooledTxs},
{Name: "BlobViolations", Fn: s.TestBlobViolations},
{Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar},
{Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar},
}
}

Expand Down Expand Up @@ -825,3 +831,194 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
conn.Close()
}
}

// mangleSidecar returns a copy of the given blob transaction where the sidecar
// data has been modified to produce a different commitment hash.
func mangleSidecar(tx *types.Transaction) *types.Transaction {
sidecar := tx.BlobTxSidecar()
copy := types.BlobTxSidecar{
Blobs: append([]kzg4844.Blob{}, sidecar.Blobs...),
Commitments: append([]kzg4844.Commitment{}, sidecar.Commitments...),
Proofs: append([]kzg4844.Proof{}, sidecar.Proofs...),
}
// zero the first commitment to alter the sidecar hash
copy.Commitments[0] = kzg4844.Commitment{}
return tx.WithBlobTxSidecar(&copy)
}

func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 42)[0]
badTx := tx.WithoutBlobTxSidecar()
s.testBadBlobTx(t, tx, badTx)
}

func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 43)[0]
badTx := mangleSidecar(tx)
s.testBadBlobTx(t, tx, badTx)
}

// readUntil reads eth protocol messages until a message of the target type is
// received. It returns an error if there is a disconnect, or if the context
// is cancelled before a message of the desired type can be read.
func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) {
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
received, err := conn.ReadEth()
if err != nil {
if err == errDisc {
return nil, errDisc
}
continue
}

switch res := received.(type) {
case *T:
return res, nil
}
}
}

// readUntilDisconnect reads eth protocol messages until the peer disconnects.
// It returns whether the peer disconnects in the next 100ms.
func readUntilDisconnect(conn *Conn) (disconnected bool) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := readUntil[struct{}](ctx, conn)
return err == errDisc
}

func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) {
stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup)
stage1.Add(1)
stage2.Add(1)
stage3.Add(1)

errc := make(chan error)

badPeer := func() {
// announce the correct hash from the bad peer.
// when the transaction is first requested before transmitting it from the bad peer,
// trigger step 2: connection and announcement by good peers

conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()

if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("bad peer: peering failed: %v", err)
return
}

ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(badTx.Size())},
Hashes: []common.Hash{badTx.Hash()},
}

if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}

req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err)
return
}

stage1.Done()
stage2.Wait()

// the good peer is connected, and has announced the tx.
// proceed to send the incorrect one from the bad peer.

resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{badTx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if !readUntilDisconnect(conn) {
errc <- fmt.Errorf("expected bad peer to be disconnected")
return
}
stage3.Done()
}

goodPeer := func() {
stage1.Wait()

conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()

if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("peering failed: %v", err)
return
}

ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
}

if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}

// wait until the bad peer has transmitted the incorrect transaction
stage2.Done()
stage3.Wait()

// the bad peer has transmitted the bad tx, and been disconnected.
// transmit the same tx but with correct sidecar from the good peer.

var req *eth.GetPooledTransactionsPacket
req, err = readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("reading pooled tx request failed: %v", err)
return
}

if req.GetPooledTransactionsRequest[0] != tx.Hash() {
errc <- fmt.Errorf("requested unknown tx hash")
return
}

resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{tx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if readUntilDisconnect(conn) {
errc <- fmt.Errorf("unexpected disconnect")
return
}
close(errc)
}

if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}

go goodPeer()
go badPeer()
err := <-errc
if err != nil {
t.Fatalf("%v", err)
}
}
14 changes: 2 additions & 12 deletions core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package txpool

import (
"crypto/sha256"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -170,20 +169,11 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) err
if len(sidecar.Blobs) != len(hashes) {
return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes))
}
if len(sidecar.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes))
}
if len(sidecar.Proofs) != len(hashes) {
return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes))
}
// Blob quantities match up, validate that the provers match with the
// transaction hash before getting to the cryptography
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
if err := sidecar.ValidateBlobCommitmentHashes(hashes); err != nil {
return err
}
// Blob commitments match with the hashes in the transaction, verify the
// blobs themselves via KZG
Expand Down
17 changes: 17 additions & 0 deletions core/types/tx_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package types
import (
"bytes"
"crypto/sha256"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -85,6 +86,22 @@ func (sc *BlobTxSidecar) encodedSize() uint64 {
return rlp.ListSize(blobs) + rlp.ListSize(commitments) + rlp.ListSize(proofs)
}

// ValidateBlobCommitmentHashes checks whether the given hashes correspond to the
// commitments in the sidecar
func (sc *BlobTxSidecar) ValidateBlobCommitmentHashes(hashes []common.Hash) error {
if len(sc.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sc.Commitments), len(hashes))
}
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sc.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
}
return nil
}

// blobTxWithBlobs is used for encoding of transactions when blobs are present.
type blobTxWithBlobs struct {
BlobTx *BlobTx
Expand Down
13 changes: 13 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

case *eth.PooledTransactionsResponse:
// If we receive any blob transactions missing sidecars, or with
// sidecars that don't correspond to the versioned hashes reported
// in the header, disconnect from the sending peer.
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
if tx.BlobTxSidecar() == nil {
return errors.New("received sidecar-less blob transaction")
}
if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
return err
}
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, true)

default:
Expand Down