Skip to content

Commit 2e3a3c3

Browse files
committed
implement fetching mempool and rbf screening
1 parent b03ae02 commit 2e3a3c3

File tree

3 files changed

+189
-73
lines changed

3 files changed

+189
-73
lines changed

pkg/client-sdk/client.go

Lines changed: 102 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,46 @@ func (a *covenantlessArkClient) refreshVtxoDb(spendableVtxos, spentVtxos []clien
10071007
return nil
10081008
}
10091009

1010+
func (a *covenantlessArkClient) getTransaction(ctx context.Context, txId string) (string, uint64, error) {
1011+
_, boardingAddresses, _, err := a.wallet.GetAddresses(ctx)
1012+
if err != nil {
1013+
return "", 0, err
1014+
}
1015+
1016+
txHex, err := a.explorer.GetTxHex(txId)
1017+
if err != nil {
1018+
return "", 0, err
1019+
}
1020+
rawTx := &wire.MsgTx{}
1021+
if err := rawTx.Deserialize(strings.NewReader(txHex)); err != nil {
1022+
return "", 0, err
1023+
}
1024+
1025+
netParams := utils.ToBitcoinNetwork(a.Network)
1026+
amount := uint64(0)
1027+
for _, addr := range boardingAddresses {
1028+
decoded, err := btcutil.DecodeAddress(addr.Address, &netParams)
1029+
if err != nil {
1030+
return "", 0, err
1031+
}
1032+
pkScript, err := txscript.PayToAddrScript(decoded)
1033+
if err != nil {
1034+
return "", 0, err
1035+
}
1036+
for _, out := range rawTx.TxOut {
1037+
if bytes.Equal(out.PkScript, pkScript) {
1038+
amount = uint64(out.Value)
1039+
break
1040+
}
1041+
}
1042+
if amount > 0 {
1043+
break
1044+
}
1045+
}
1046+
1047+
return txHex, amount, nil
1048+
}
1049+
10101050
func (a *covenantlessArkClient) listenWebsocketBoardingTxns(ctx context.Context) {
10111051
// try to add existing boarding utxos if present
10121052
_, boardingAddresses, _, err := a.wallet.GetAddresses(ctx)
@@ -1018,47 +1058,85 @@ func (a *covenantlessArkClient) listenWebsocketBoardingTxns(ctx context.Context)
10181058
}
10191059
}
10201060

1021-
err = a.explorer.ListenAddresses(func(utxos []explorer.BlockUtxo) error {
1022-
_, boardingAddresses, _, err := a.wallet.GetAddresses(ctx)
1023-
if err != nil {
1024-
return err
1025-
}
1061+
err = a.explorer.ListenAddresses(func(boaringUtxos, mempoolUtxos []explorer.BlockUtxo) error {
1062+
if len(mempoolUtxos) > 0 {
1063+
newPendingBoardingTxs := make([]types.Transaction, 0)
1064+
createdAt := time.Now()
10261065

1027-
_, blockTime, err := a.explorer.GetTxBlockTime(utxos[0].Txid)
1028-
if err != nil {
1029-
return err
1030-
}
1066+
for _, u := range mempoolUtxos {
10311067

1032-
newPendingBoardingTxs := make([]types.Transaction, 0)
1068+
isRbf, replacementTxIds, err := a.explorer.FetchMempoolRBFTx(u.Txid)
1069+
if err != nil {
1070+
return err
1071+
}
1072+
1073+
if isRbf {
1074+
txHex, amount, err := a.getTransaction(ctx, u.Txid)
1075+
if err != nil {
1076+
log.WithError(err).Errorf("failed to get rbf transaction %s", u.Txid)
1077+
return err
1078+
}
1079+
1080+
rbfTransactions := make(map[string]types.Transaction, 0)
1081+
for _, rbfTxId := range replacementTxIds {
1082+
rbfTransactions[rbfTxId] = types.Transaction{
1083+
TransactionKey: types.TransactionKey{
1084+
BoardingTxid: u.Txid,
1085+
},
1086+
Hex: txHex,
1087+
Amount: amount,
1088+
Type: types.TxReceived,
1089+
CreatedAt: time.Now(),
1090+
}
1091+
}
10331092

1034-
for _, u := range utxos {
1035-
found := false
1036-
for _, addr := range boardingAddresses {
1037-
if addr.Address == u.ScriptPubAddress {
1038-
found = true
1039-
break
1093+
count, err := a.store.TransactionStore().RbfTransactions(ctx, rbfTransactions)
1094+
if err != nil {
1095+
log.WithError(err).Error("failed to update rbf boarding transactions")
1096+
return err
1097+
}
1098+
log.Debugf("replaced %d transaction(s)", count)
1099+
1100+
continue
10401101
}
1041-
}
1042-
if found {
1102+
10431103
newPendingBoardingTxs = append(newPendingBoardingTxs, types.Transaction{
10441104
TransactionKey: types.TransactionKey{
10451105
BoardingTxid: u.Txid,
10461106
},
10471107
Amount: u.Value,
10481108
Type: types.TxReceived,
1049-
CreatedAt: time.Unix(blockTime, 0),
1109+
CreatedAt: createdAt,
10501110
})
10511111
}
1112+
1113+
count, err := a.store.TransactionStore().
1114+
AddTransactions(ctx, newPendingBoardingTxs)
1115+
1116+
if err != nil {
1117+
log.WithError(err).Error("failed to add new boarding transactions")
1118+
return err
1119+
}
1120+
log.Debugf("added %d boarding transaction(s)", count)
1121+
}
1122+
1123+
if len(boaringUtxos) > 0 {
1124+
ids := make([]string, 0, len(boaringUtxos))
1125+
for _, u := range boaringUtxos {
1126+
ids = append(ids, u.Txid)
1127+
}
1128+
count, err := a.store.TransactionStore().ConfirmTransactions(
1129+
ctx, ids, time.Now(),
1130+
)
1131+
if err != nil {
1132+
log.WithError(err).Error("failed to update boarding transactions")
1133+
}
1134+
log.Debugf("confirmed %d boarding transaction(s)", count)
10521135
}
10531136

1054-
count, err := a.store.TransactionStore().
1055-
AddTransactions(ctx, newPendingBoardingTxs)
10561137
if err != nil {
10571138
return err
1058-
10591139
}
1060-
log.Debugf("added %d boarding transaction(s)", count)
1061-
10621140
return nil
10631141
})
10641142

pkg/client-sdk/explorer/explorer.go

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type Explorer interface {
4343
BaseUrl() string
4444
GetFeeRate() (float64, error)
4545
TrackAddress(addr string) error
46-
ListenAddresses(messageHandler func([]BlockUtxo) error) error
46+
ListenAddresses(messageHandler func([]BlockUtxo, []BlockUtxo) error) error
47+
FetchMempoolRBFTx(txid string) (bool, []string, error)
4748
}
4849

4950
type AddrTracker struct {
@@ -187,6 +188,28 @@ func (e *explorerSvc) GetTxs(addr string) ([]tx, error) {
187188
return payload, nil
188189
}
189190

191+
func (e *explorerSvc) GetTxByTxid(addr string) ([]tx, error) {
192+
resp, err := http.Get(fmt.Sprintf("%s/address/%s/txs", e.baseUrl, addr))
193+
if err != nil {
194+
return nil, err
195+
}
196+
// nolint:all
197+
defer resp.Body.Close()
198+
body, err := io.ReadAll(resp.Body)
199+
if err != nil {
200+
return nil, err
201+
}
202+
if resp.StatusCode != http.StatusOK {
203+
return nil, fmt.Errorf("failed to get txs: %s", string(body))
204+
}
205+
payload := []tx{}
206+
if err := json.Unmarshal(body, &payload); err != nil {
207+
return nil, err
208+
}
209+
210+
return payload, nil
211+
}
212+
190213
func (e explorerSvc) IsRBFTx(txid, txHex string) (bool, string, int64, error) {
191214
resp, err := http.Get(fmt.Sprintf("%s/v1/fullrbf/replacements", e.baseUrl))
192215
if err != nil {
@@ -206,8 +229,8 @@ func (e explorerSvc) IsRBFTx(txid, txHex string) (bool, string, int64, error) {
206229
return false, "", -1, fmt.Errorf("%s", string(body))
207230
}
208231

209-
isRbf, replacedBy, timestamp, err := e.mempoolIsRBFTx(
210-
fmt.Sprintf("%s/v1/fullrbf/replacements", e.baseUrl), txid,
232+
isRbf, replacedBy, timestamp, _, err := e.mempoolIsRBFTx(
233+
fmt.Sprintf("%s/v1/fullrbf/replacements", e.baseUrl), txid, false,
211234
)
212235
if err != nil {
213236
return false, "", -1, err
@@ -216,7 +239,24 @@ func (e explorerSvc) IsRBFTx(txid, txHex string) (bool, string, int64, error) {
216239
return isRbf, replacedBy, timestamp, nil
217240
}
218241

219-
return e.mempoolIsRBFTx(fmt.Sprintf("%s/v1/replacements", e.baseUrl), txid)
242+
isRbf, replacedBy, timestamp, _, err = e.mempoolIsRBFTx(fmt.Sprintf("%s/v1/replacements", e.baseUrl), txid, false)
243+
244+
return isRbf, replacedBy, timestamp, err
245+
}
246+
247+
func (e *explorerSvc) FetchMempoolRBFTx(txid string) (bool, []string, error) {
248+
isRbf, _, _, replacements, err := e.mempoolIsRBFTx(
249+
fmt.Sprintf("%s/v1/fullrbf/replacements", e.baseUrl), txid, true,
250+
)
251+
if err != nil {
252+
return false, nil, err
253+
}
254+
if isRbf {
255+
return true, replacements, nil
256+
}
257+
258+
isRbf, _, _, replacements, err = e.mempoolIsRBFTx(fmt.Sprintf("%s/v1/replacements", e.baseUrl), txid, false)
259+
return isRbf, replacements, err
220260
}
221261

222262
func (e *explorerSvc) GetTxOutspends(txid string) ([]spentStatus, error) {
@@ -310,7 +350,7 @@ func (e *explorerSvc) GetRedeemedVtxosBalance(
310350
return
311351
}
312352

313-
func (e *explorerSvc) ListenAddresses(messageHandler func([]BlockUtxo) error) error {
353+
func (e *explorerSvc) ListenAddresses(messageHandler func([]BlockUtxo, []BlockUtxo) error) error {
314354
if e.addrTracker == nil {
315355
return fmt.Errorf("address tracker not initialized")
316356
}
@@ -395,36 +435,49 @@ func (e *explorerSvc) broadcast(txHex string) (string, error) {
395435
return string(bodyResponse), nil
396436
}
397437

398-
func (e *explorerSvc) mempoolIsRBFTx(url, txid string) (bool, string, int64, error) {
438+
func (e *explorerSvc) mempoolIsRBFTx(url, txid string, isReplacing bool) (bool, string, int64, []string, error) {
399439
resp, err := http.Get(url)
400440
if err != nil {
401-
return false, "", -1, err
441+
return false, "", -1, nil, err
402442
}
403443

404444
// nolint:all
405445
defer resp.Body.Close()
406446
body, err := io.ReadAll(resp.Body)
407447
if err != nil {
408-
return false, "", -1, err
448+
return false, "", -1, nil, err
409449
}
410450

411451
if resp.StatusCode != http.StatusOK {
412-
return false, "", -1, fmt.Errorf("%s", string(body))
452+
return false, "", -1, nil, fmt.Errorf("%s", string(body))
413453
}
414454

415455
replacements := make([]replacement, 0)
416456
if err := json.Unmarshal(body, &replacements); err != nil {
417-
return false, "", -1, err
457+
return false, "", -1, nil, err
458+
}
459+
460+
if isReplacing {
461+
for _, r := range replacements {
462+
if r.Tx.Txid == txid {
463+
replacementTxIds := make([]string, 0, len(r.Replaces))
464+
for _, rr := range r.Replaces {
465+
replacementTxIds = append(replacementTxIds, rr.Tx.Txid)
466+
}
467+
return true, r.Tx.Txid, r.Timestamp, replacementTxIds, nil
468+
}
469+
}
470+
return false, "", 0, nil, nil
418471
}
419472

420473
for _, r := range replacements {
421474
for _, rr := range r.Replaces {
422475
if rr.Tx.Txid == txid {
423-
return true, r.Tx.Txid, r.Timestamp, nil
476+
return true, r.Tx.Txid, r.Timestamp, nil, nil
424477
}
425478
}
426479
}
427-
return false, "", 0, nil
480+
return false, "", 0, nil, nil
428481
}
429482

430483
func (e *explorerSvc) esploraIsRBFTx(txid, txHex string) (bool, string, int64, error) {
@@ -557,7 +610,7 @@ func (t *AddrTracker) TrackAddress(addr string) error {
557610
return nil
558611
}
559612

560-
func (t *AddrTracker) ListenAddresses(messageHandler func([]BlockUtxo) error) error {
613+
func (t *AddrTracker) ListenAddresses(messageHandler func([]BlockUtxo, []BlockUtxo) error) error {
561614
// Send ping every 25s to keep alive
562615
go func() {
563616
ticker := time.NewTicker(25 * time.Second)
@@ -571,28 +624,27 @@ func (t *AddrTracker) ListenAddresses(messageHandler func([]BlockUtxo) error) er
571624
}()
572625

573626
for {
574-
var payload WSBlockTransactions
627+
var payload WSFetchTransactions
575628
err := t.conn.ReadJSON(&payload)
576629
if err != nil {
577630
return fmt.Errorf("read message failed: %w", err)
578631
}
579632

580-
if len(payload.BlockTransactions) == 0 {
581-
continue
582-
}
633+
mempoolutxos := t.deriveUtxos(payload.MempoolTransactions)
634+
blockutxos := t.deriveUtxos(payload.BlockTransactions)
635+
636+
err = messageHandler(blockutxos, mempoolutxos)
583637

584-
utxos := t.deriveUtxos(payload)
585-
err = messageHandler(utxos)
586638
if err != nil {
587639
return err
588640
}
589641
}
590642

591643
}
592644

593-
func (t *AddrTracker) deriveUtxos(block WSBlockTransactions) []BlockUtxo {
645+
func (t *AddrTracker) deriveUtxos(trasactions []RawTx) []BlockUtxo {
594646
utxos := make([]BlockUtxo, 0, len(t.subscribedMap))
595-
for _, rawTransaction := range block.BlockTransactions {
647+
for _, rawTransaction := range trasactions {
596648

597649
for index, out := range rawTransaction.Vout {
598650
if _, ok := t.subscribedMap[out.ScriptPubKeyAddr]; ok {
@@ -605,5 +657,6 @@ func (t *AddrTracker) deriveUtxos(block WSBlockTransactions) []BlockUtxo {
605657
}
606658
}
607659
}
660+
608661
return utxos
609662
}

0 commit comments

Comments
 (0)