@@ -2,7 +2,6 @@ package redislivestore
2
2
3
3
import (
4
4
"context"
5
- "errors"
6
5
"fmt"
7
6
"sort"
8
7
"time"
@@ -58,56 +57,33 @@ func (s *txRequestsStore) Len() int64 {
58
57
59
58
func (s * txRequestsStore ) Push (request domain.TxRequest , boardingInputs []ports.BoardingInput , cosignerPubkeys []string ) error {
60
59
ctx := context .Background ()
60
+ var err error
61
61
for attempt := 0 ; attempt < s .numOfRetries ; attempt ++ {
62
- err := s .rdb .Watch (ctx , func (tx * redis.Tx ) error {
63
- ids , err := tx .SMembers (ctx , txReqStoreReqIdsKey ).Result ()
64
- if err != nil && ! errors .Is (err , redis .Nil ) {
65
- return err
66
- }
67
-
68
- reqs , err := s .requests .GetMulti (ctx , ids )
62
+ err = s .rdb .Watch (ctx , func (tx * redis.Tx ) error {
63
+ exists , err := tx .SIsMember (ctx , txReqStoreReqIdsKey , request .Id ).Result ()
69
64
if err != nil {
70
65
return err
71
66
}
72
-
73
- if len (reqs ) > 0 {
74
- idToReq := make (map [string ]* ports.TimedTxRequest , len (ids ))
75
- for i , id := range ids {
76
- if reqs [i ] != nil {
77
- idToReq [id ] = reqs [i ]
78
- }
79
- }
80
-
81
- if _ , exists := idToReq [request .Id ]; exists {
82
- return fmt .Errorf ("duplicated tx request %s" , request .Id )
83
- }
84
-
85
- inputMap := make (map [string ]string )
86
- boardingInputMap := make (map [string ]string )
87
- for _ , pay := range idToReq {
88
- for _ , pInput := range pay .Inputs {
89
- key := fmt .Sprintf ("%s:%d" , pInput .Txid , pInput .VOut )
90
- inputMap [key ] = pay .Id
91
- }
92
- for _ , pBoardingInput := range pay .BoardingInputs {
93
- key := fmt .Sprintf ("%s:%d" , pBoardingInput .Txid , pBoardingInput .VOut )
94
- boardingInputMap [key ] = pay .Id
95
- }
67
+ if exists {
68
+ return fmt .Errorf ("duplicated tx request %s" , request .Id )
69
+ }
70
+ // Check input duplicates directly in Redis set
71
+ for _ , input := range request .Inputs {
72
+ if input .IsNote () {
73
+ continue
96
74
}
97
- for _ , input := range request .Inputs {
98
- key := fmt .Sprintf ("%s:%d" , input .Txid , input .VOut )
99
- if dupId , exists := inputMap [key ]; exists {
100
- return fmt .Errorf ("duplicated input, %s already used by tx request %s" , key , dupId )
101
- }
75
+ key := input .String ()
76
+ exists , err := tx .SIsMember (ctx , txReqStoreVtxosKey , key ).Result ()
77
+ if err != nil {
78
+ return err
102
79
}
103
- for _ , input := range boardingInputs {
104
- key := fmt .Sprintf ("%s:%d" , input .Txid , input .VOut )
105
- if dupId , exists := boardingInputMap [key ]; exists {
106
- return fmt .Errorf ("duplicated boarding input, %s already used by tx request %s" , key , dupId )
107
- }
80
+ if exists {
81
+ return fmt .Errorf ("duplicated input, %s already registered by another request" , key )
108
82
}
109
83
}
110
84
85
+ // Check boarding inputs similarly if you store them
86
+
111
87
now := time .Now ()
112
88
timedReq := & ports.TimedTxRequest {
113
89
TxRequest : request ,
@@ -132,13 +108,13 @@ func (s *txRequestsStore) Push(request domain.TxRequest, boardingInputs []ports.
132
108
})
133
109
134
110
return err
135
- }, txReqStoreReqIdsKey )
111
+ }, txReqStoreVtxosKey , txReqStoreReqIdsKey ) // WATCH both keys
136
112
if err == nil {
137
113
return nil
138
114
}
139
115
time .Sleep (10 * time .Millisecond )
140
116
}
141
- return fmt . Errorf ( "push failed after %v retries" , s . numOfRetries )
117
+ return err
142
118
}
143
119
144
120
func (s * txRequestsStore ) Pop (num int64 ) []ports.TimedTxRequest {
0 commit comments