Skip to content

Commit 2dc3c78

Browse files
lmittmanndiwu1989
authored andcommitted
eth/filter, ethclient/gethclient: added fullTx-flag to NewPendingTransactions
* #24524
1 parent d99a04a commit 2dc3c78

File tree

5 files changed

+92
-44
lines changed

5 files changed

+92
-44
lines changed

eth/filters/api.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ import (
3939
// and associated subscription in the event system.
4040
type filter struct {
4141
typ Type
42-
deadline *time.Timer // filter is inactiv when deadline triggers
42+
deadline *time.Timer // filter is inactive when deadline triggers
4343
hashes []common.Hash
44+
txs []*types.Transaction
4445
crit FilterCriteria
4546
logs []*types.Log
4647
s *Subscription // associated subscription in event system
@@ -101,7 +102,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
101102
}
102103
}
103104

104-
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
105+
// NewPendingTransactionFilter creates a filter that fetches pending transactions
105106
// as transactions enter the pending state.
106107
//
107108
// It is part of the filter package because this filter can be used through the
@@ -110,21 +111,21 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
110111
// https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter
111112
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
112113
var (
113-
pendingTxs = make(chan []common.Hash)
114+
pendingTxs = make(chan []*types.Transaction)
114115
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
115116
)
116117

117118
api.filtersMu.Lock()
118-
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
119+
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
119120
api.filtersMu.Unlock()
120121

121122
go func() {
122123
for {
123124
select {
124-
case ph := <-pendingTxs:
125+
case pTx := <-pendingTxs:
125126
api.filtersMu.Lock()
126127
if f, found := api.filters[pendingTxSub.ID]; found {
127-
f.hashes = append(f.hashes, ph...)
128+
f.txs = append(f.txs, pTx...)
128129
}
129130
api.filtersMu.Unlock()
130131
case <-pendingTxSub.Err():
@@ -139,9 +140,10 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
139140
return pendingTxSub.ID
140141
}
141142

142-
// NewPendingTransactions creates a subscription that is triggered each time a transaction
143-
// enters the transaction pool and was signed from one of the transactions this nodes manages.
144-
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
143+
// NewPendingTransactions creates a subscription that is triggered each time a
144+
// transaction enters the transaction pool. If fullTx is true the full tx is
145+
// sent to the client, otherwise the hash is sent.
146+
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
145147
notifier, supported := rpc.NotifierFromContext(ctx)
146148
if !supported {
147149
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
@@ -150,16 +152,20 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
150152
rpcSub := notifier.CreateSubscription()
151153

152154
go func() {
153-
txHashes := make(chan []common.Hash, 128)
154-
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
155+
txs := make(chan []*types.Transaction, 128)
156+
pendingTxSub := api.events.SubscribePendingTxs(txs)
155157

156158
for {
157159
select {
158-
case hashes := <-txHashes:
160+
case txs := <-txs:
159161
// To keep the original behaviour, send a single tx hash in one notification.
160162
// TODO(rjl493456442) Send a batch of tx hashes in one notification
161-
for _, h := range hashes {
162-
notifier.Notify(rpcSub.ID, h)
163+
for _, tx := range txs {
164+
if fullTx != nil && *fullTx {
165+
notifier.Notify(rpcSub.ID, tx)
166+
} else {
167+
notifier.Notify(rpcSub.ID, tx.Hash())
168+
}
163169
}
164170
case <-rpcSub.Err():
165171
pendingTxSub.Unsubscribe()
@@ -440,10 +446,14 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
440446
f.deadline.Reset(api.timeout)
441447

442448
switch f.typ {
443-
case PendingTransactionsSubscription, BlocksSubscription:
449+
case BlocksSubscription:
444450
hashes := f.hashes
445451
f.hashes = nil
446452
return returnHashes(hashes), nil
453+
case PendingTransactionsSubscription:
454+
txs := f.txs
455+
f.txs = nil
456+
return txs, nil
447457
case LogsSubscription, MinedAndPendingLogsSubscription:
448458
logs := f.logs
449459
f.logs = nil

eth/filters/filter_system.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ const (
4747
PendingLogsSubscription
4848
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
4949
MinedAndPendingLogsSubscription
50-
// PendingTransactionsSubscription queries tx hashes for pending
51-
// transactions entering the pending state
50+
// PendingTransactionsSubscription queries for pending transactions entering
51+
// the pending state
5252
PendingTransactionsSubscription
5353
// BlocksSubscription queries hashes for blocks that are imported
5454
BlocksSubscription
@@ -74,7 +74,7 @@ type subscription struct {
7474
created time.Time
7575
logsCrit ethereum.FilterQuery
7676
logs chan []*types.Log
77-
hashes chan []common.Hash
77+
txs chan []*types.Transaction
7878
headers chan *types.Header
7979
installed chan struct{} // closed when the filter is installed
8080
err chan error // closed when the filter is uninstalled
@@ -165,7 +165,7 @@ func (sub *Subscription) Unsubscribe() {
165165
case sub.es.uninstall <- sub.f:
166166
break uninstallLoop
167167
case <-sub.f.logs:
168-
case <-sub.f.hashes:
168+
case <-sub.f.txs:
169169
case <-sub.f.headers:
170170
}
171171
}
@@ -232,7 +232,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
232232
logsCrit: crit,
233233
created: time.Now(),
234234
logs: logs,
235-
hashes: make(chan []common.Hash),
235+
txs: make(chan []*types.Transaction),
236236
headers: make(chan *types.Header),
237237
installed: make(chan struct{}),
238238
err: make(chan error),
@@ -249,7 +249,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
249249
logsCrit: crit,
250250
created: time.Now(),
251251
logs: logs,
252-
hashes: make(chan []common.Hash),
252+
txs: make(chan []*types.Transaction),
253253
headers: make(chan *types.Header),
254254
installed: make(chan struct{}),
255255
err: make(chan error),
@@ -266,7 +266,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
266266
logsCrit: crit,
267267
created: time.Now(),
268268
logs: logs,
269-
hashes: make(chan []common.Hash),
269+
txs: make(chan []*types.Transaction),
270270
headers: make(chan *types.Header),
271271
installed: make(chan struct{}),
272272
err: make(chan error),
@@ -282,23 +282,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
282282
typ: BlocksSubscription,
283283
created: time.Now(),
284284
logs: make(chan []*types.Log),
285-
hashes: make(chan []common.Hash),
285+
txs: make(chan []*types.Transaction),
286286
headers: headers,
287287
installed: make(chan struct{}),
288288
err: make(chan error),
289289
}
290290
return es.subscribe(sub)
291291
}
292292

293-
// SubscribePendingTxs creates a subscription that writes transaction hashes for
293+
// SubscribePendingTxs creates a subscription that writes transactions for
294294
// transactions that enter the transaction pool.
295-
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
295+
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
296296
sub := &subscription{
297297
id: rpc.NewID(),
298298
typ: PendingTransactionsSubscription,
299299
created: time.Now(),
300300
logs: make(chan []*types.Log),
301-
hashes: hashes,
301+
txs: txs,
302302
headers: make(chan *types.Header),
303303
installed: make(chan struct{}),
304304
err: make(chan error),
@@ -342,12 +342,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
342342
}
343343

344344
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
345-
hashes := make([]common.Hash, 0, len(ev.Txs))
346-
for _, tx := range ev.Txs {
347-
hashes = append(hashes, tx.Hash())
348-
}
349345
for _, f := range filters[PendingTransactionsSubscription] {
350-
f.hashes <- hashes
346+
f.txs <- ev.Txs
351347
}
352348
}
353349

eth/filters/filter_system_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func TestPendingTxFilter(t *testing.T) {
233233
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
234234
}
235235

236-
hashes []common.Hash
236+
txs []*types.Transaction
237237
)
238238

239239
fid0 := api.NewPendingTransactionFilter()
@@ -248,9 +248,9 @@ func TestPendingTxFilter(t *testing.T) {
248248
t.Fatalf("Unable to retrieve logs: %v", err)
249249
}
250250

251-
h := results.([]common.Hash)
252-
hashes = append(hashes, h...)
253-
if len(hashes) >= len(transactions) {
251+
tx := results.([]*types.Transaction)
252+
txs = append(txs, tx...)
253+
if len(txs) >= len(transactions) {
254254
break
255255
}
256256
// check timeout
@@ -261,13 +261,13 @@ func TestPendingTxFilter(t *testing.T) {
261261
time.Sleep(100 * time.Millisecond)
262262
}
263263

264-
if len(hashes) != len(transactions) {
265-
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
264+
if len(txs) != len(transactions) {
265+
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
266266
return
267267
}
268-
for i := range hashes {
269-
if hashes[i] != transactions[i].Hash() {
270-
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
268+
for i := range txs {
269+
if txs[i].Hash() != transactions[i].Hash() {
270+
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
271271
}
272272
}
273273
}
@@ -647,11 +647,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
647647
fids[i] = fid
648648
// Wait for at least one tx to arrive in filter
649649
for {
650-
hashes, err := api.GetFilterChanges(fid)
650+
txs, err := api.GetFilterChanges(fid)
651651
if err != nil {
652652
t.Fatalf("Filter should exist: %v\n", err)
653653
}
654-
if len(hashes.([]common.Hash)) > 0 {
654+
if len(txs.([]*types.Transaction)) > 0 {
655655
break
656656
}
657657
runtime.Gosched()

ethclient/gethclient/gethclient.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
174174
return &result, err
175175
}
176176

177-
// SubscribePendingTransactions subscribes to new pending transactions.
177+
// SubscribeFullPendingTransactions subscribes to new pending transactions.
178+
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
179+
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
180+
}
181+
182+
// SubscribePendingTransactions subscribes to new pending transaction hashes.
178183
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
179184
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
180185
}

ethclient/gethclient/gethclient_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ func TestGethClient(t *testing.T) {
120120
"TestSetHead",
121121
func(t *testing.T) { testSetHead(t, client) },
122122
}, {
123-
"TestSubscribePendingTxs",
123+
"TestSubscribePendingTxHashes",
124124
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
125+
}, {
126+
"TestSubscribePendingTxs",
127+
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
125128
}, {
126129
"TestCallContract",
127130
func(t *testing.T) { testCallContract(t, client) },
@@ -283,6 +286,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
283286
}
284287
}
285288

289+
func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
290+
ec := New(client)
291+
ethcl := ethclient.NewClient(client)
292+
// Subscribe to Transactions
293+
ch := make(chan *types.Transaction)
294+
ec.SubscribeFullPendingTransactions(context.Background(), ch)
295+
// Send a transaction
296+
chainID, err := ethcl.ChainID(context.Background())
297+
if err != nil {
298+
t.Fatal(err)
299+
}
300+
// Create transaction
301+
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
302+
signer := types.LatestSignerForChainID(chainID)
303+
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
304+
if err != nil {
305+
t.Fatal(err)
306+
}
307+
signedTx, err := tx.WithSignature(signer, signature)
308+
if err != nil {
309+
t.Fatal(err)
310+
}
311+
// Send transaction
312+
err = ethcl.SendTransaction(context.Background(), signedTx)
313+
if err != nil {
314+
t.Fatal(err)
315+
}
316+
// Check that the transaction was send over the channel
317+
tx = <-ch
318+
if tx.Hash() != signedTx.Hash() {
319+
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
320+
}
321+
}
322+
286323
func testCallContract(t *testing.T, client *rpc.Client) {
287324
ec := New(client)
288325
msg := ethereum.CallMsg{

0 commit comments

Comments
 (0)