Skip to content

Commit a49b42f

Browse files
authored
[Server] Move db write ops to background (#512)
* Move db write ops to background * Impl retry logic at repo level * Drop retry logic from app layer * Fix * Fix tests * Lint * Fix wasm wrapper
1 parent a9dfa11 commit a49b42f

File tree

15 files changed

+658
-226
lines changed

15 files changed

+658
-226
lines changed

pkg/client-sdk/test/wasm/wasm_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,20 @@ func TestWasm(t *testing.T) {
121121
require.NoError(t, err)
122122

123123
time.Sleep(3 * time.Second)
124-
err = generateBlock()
125-
require.NoError(t, err)
126-
time.Sleep(2 * time.Second)
124+
125+
wg := &sync.WaitGroup{}
126+
wg.Add(1)
127+
go func() {
128+
defer wg.Done()
129+
waitForPayment(alicePage, aliceAddr.OffchainAddr)
130+
}()
127131

128132
txID, err := settle(alicePage)
129133
require.NoError(t, err)
130134
t.Logf("Alice onboard txID: %v", txID)
131135

136+
wg.Wait()
137+
132138
aliceBalance, err := getBalance(alicePage)
133139
require.NoError(t, err)
134140
t.Logf("Alice onchain balance: %v", aliceBalance.OnchainSpendable)
@@ -141,20 +147,32 @@ func TestWasm(t *testing.T) {
141147

142148
amount := 1000
143149
t.Logf("Alice is sending %d sats to Bob offchain...", amount)
150+
wg.Add(1)
151+
go func() {
152+
defer wg.Done()
153+
waitForPayment(alicePage, bobAddr.OffchainAddr)
154+
}()
144155
require.NoError(t, sendOffChain(alicePage, bobAddr.OffchainAddr, amount))
145156

146157
t.Log("Transaction completed out of round")
147158

159+
wg.Wait()
160+
148161
t.Logf("Bob settling the received funds...")
162+
wg.Add(1)
163+
go func() {
164+
defer wg.Done()
165+
waitForPayment(bobPage, bobAddr.OffchainAddr)
166+
}()
149167
txID, err = settle(bobPage)
150168
require.NoError(t, err)
151169
t.Logf("Bob settled the received funds in round %v", txID)
152170

171+
wg.Wait()
172+
153173
err = generateBlock()
154174
require.NoError(t, err)
155175

156-
time.Sleep(5 * time.Second)
157-
158176
aliceBalance, err = getBalance(alicePage)
159177
require.NoError(t, err)
160178
t.Logf("Alice onchain balance: %v", aliceBalance.OnchainSpendable)
@@ -350,6 +368,18 @@ func sendOffChain(page playwright.Page, addr string, amount int) error {
350368
return err
351369
}
352370

371+
func waitForPayment(page playwright.Page, addr string) error {
372+
_, err := page.Evaluate(fmt.Sprintf(`async () => {
373+
try {
374+
return await notifyIncomingFunds("%s");
375+
} catch (err) {
376+
console.error("Error:", err);
377+
throw err;
378+
}
379+
}`, addr))
380+
return err
381+
}
382+
353383
func runCommand(name string, arg ...string) (string, error) {
354384
errb := new(strings.Builder)
355385
cmd := newCommand(name, arg...)

pkg/client-sdk/wasm/browser/wrappers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func NotifyIncomingFundsWrapper() js.Func {
574574
return nil, err
575575
}
576576

577-
return result, nil
577+
return js.ValueOf(string(result)), nil
578578
})
579579
}
580580

server/internal/core/application/covenantless.go

Lines changed: 97 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -524,54 +524,53 @@ func (s *covenantlessService) SubmitRedeemTx(
524524
return "", "", fmt.Errorf("failed to sign redeem tx: %s", err)
525525
}
526526

527-
// Create new vtxos, update spent vtxos state
528-
newVtxos := make([]domain.Vtxo, 0, len(ptx.UnsignedTx.TxOut))
529-
for outIndex, out := range outputs {
530-
vtxoTapKey, err := schnorr.ParsePubKey(out.PkScript[2:])
531-
if err != nil {
532-
return "", "", fmt.Errorf("failed to parse vtxo taproot key: %s", err)
527+
go func(ptx *psbt.Packet, signedRedeemTx, redeemTxid string) {
528+
ctx := context.Background()
529+
// Create new vtxos, update spent vtxos state
530+
newVtxos := make([]domain.Vtxo, 0, len(ptx.UnsignedTx.TxOut))
531+
for outIndex, out := range outputs {
532+
//notlint:all
533+
vtxoPubkey := hex.EncodeToString(out.PkScript[2:])
534+
535+
newVtxos = append(newVtxos, domain.Vtxo{
536+
VtxoKey: domain.VtxoKey{
537+
Txid: redeemTxid,
538+
VOut: uint32(outIndex),
539+
},
540+
PubKey: vtxoPubkey,
541+
Amount: uint64(out.Value),
542+
ExpireAt: expiration,
543+
RoundTxid: roundTxid,
544+
RedeemTx: signedRedeemTx,
545+
CreatedAt: time.Now().Unix(),
546+
})
533547
}
534548

535-
vtxoPubkey := hex.EncodeToString(schnorr.SerializePubKey(vtxoTapKey))
536-
537-
newVtxos = append(newVtxos, domain.Vtxo{
538-
VtxoKey: domain.VtxoKey{
539-
Txid: redeemTxid,
540-
VOut: uint32(outIndex),
541-
},
542-
PubKey: vtxoPubkey,
543-
Amount: uint64(out.Value),
544-
ExpireAt: expiration,
545-
RoundTxid: roundTxid,
546-
RedeemTx: signedRedeemTx,
547-
CreatedAt: time.Now().Unix(),
548-
})
549-
}
549+
if err := s.repoManager.Vtxos().AddVtxos(ctx, newVtxos); err != nil {
550+
log.WithError(err).Warn("failed to add vtxos")
551+
return
552+
}
553+
log.Debugf("added %d vtxos", len(newVtxos))
550554

551-
if err := s.repoManager.Vtxos().AddVtxos(ctx, newVtxos); err != nil {
552-
return "", "", fmt.Errorf("failed to add vtxos: %s", err)
553-
}
554-
log.Infof("added %d vtxos", len(newVtxos))
555-
if err := s.startWatchingVtxos(newVtxos); err != nil {
556-
log.WithError(err).Warn(
557-
"failed to start watching vtxos",
558-
)
559-
}
560-
log.Debugf("started watching %d vtxos", len(newVtxos))
555+
if err := s.repoManager.Vtxos().SpendVtxos(ctx, spentVtxoKeys, redeemTxid); err != nil {
556+
log.WithError(err).Warn("failed to spend vtxos")
557+
return
558+
}
559+
log.Debugf("spent %d vtxos", len(spentVtxos))
561560

562-
if err := s.repoManager.Vtxos().SpendVtxos(ctx, spentVtxoKeys, redeemTxid); err != nil {
563-
return "", "", fmt.Errorf("failed to spend vtxo: %s", err)
564-
}
565-
log.Infof("spent %d vtxos", len(spentVtxos))
561+
if err := s.startWatchingVtxos(newVtxos); err != nil {
562+
log.WithError(err).Warn("failed to start watching vtxos")
563+
} else {
564+
log.Debugf("started watching %d vtxos", len(newVtxos))
565+
}
566566

567-
go func() {
568567
s.transactionEventsCh <- RedeemTransactionEvent{
569568
RedeemTxid: redeemTxid,
570569
SpentVtxos: spentVtxos,
571570
SpendableVtxos: newVtxos,
572571
TxHex: signedRedeemTx,
573572
}
574-
}()
573+
}(ptx, signedRedeemTx, redeemTxid)
575574

576575
return signedRedeemTx, redeemTxid, nil
577576
}
@@ -1179,20 +1178,23 @@ func (s *covenantlessService) RegisterCosignerNonces(
11791178
if err != nil {
11801179
return fmt.Errorf("failed to decode nonces: %s", err)
11811180
}
1182-
session.lock.Lock()
1183-
defer session.lock.Unlock()
11841181

1185-
if _, ok := session.nonces[pubkey]; ok {
1186-
return nil // skip if we already have nonces for this pubkey
1187-
}
1182+
go func(session *musigSigningSession) {
1183+
session.lock.Lock()
1184+
defer session.lock.Unlock()
11881185

1189-
session.nonces[pubkey] = nonces
1186+
if _, ok := session.nonces[pubkey]; ok {
1187+
return // skip if we already have nonces for this pubkey
1188+
}
11901189

1191-
if len(session.nonces) == session.nbCosigners-1 { // exclude the server
1192-
go func() {
1193-
session.nonceDoneC <- struct{}{}
1194-
}()
1195-
}
1190+
session.nonces[pubkey] = nonces
1191+
1192+
if len(session.nonces) == session.nbCosigners-1 { // exclude the server
1193+
go func() {
1194+
session.nonceDoneC <- struct{}{}
1195+
}()
1196+
}
1197+
}(session)
11961198

11971199
return nil
11981200
}
@@ -1215,20 +1217,22 @@ func (s *covenantlessService) RegisterCosignerSignatures(
12151217
return fmt.Errorf("failed to decode signatures: %s", err)
12161218
}
12171219

1218-
session.lock.Lock()
1219-
defer session.lock.Unlock()
1220+
go func(session *musigSigningSession) {
1221+
session.lock.Lock()
1222+
defer session.lock.Unlock()
12201223

1221-
if _, ok := session.signatures[pubkey]; ok {
1222-
return nil // skip if we already have signatures for this pubkey
1223-
}
1224+
if _, ok := session.signatures[pubkey]; ok {
1225+
return // skip if we already have signatures for this pubkey
1226+
}
12241227

1225-
session.signatures[pubkey] = signatures
1228+
session.signatures[pubkey] = signatures
12261229

1227-
if len(session.signatures) == session.nbCosigners-1 { // exclude the server
1228-
go func() {
1229-
session.sigDoneC <- struct{}{}
1230-
}()
1231-
}
1230+
if len(session.signatures) == session.nbCosigners-1 { // exclude the server
1231+
go func() {
1232+
session.sigDoneC <- struct{}{}
1233+
}()
1234+
}
1235+
}(session)
12321236

12331237
return nil
12341238
}
@@ -1248,13 +1252,18 @@ func (s *covenantlessService) SetNostrRecipient(ctx context.Context, nostrRecipi
12481252
vtxoKeys = append(vtxoKeys, signedVtxo.Outpoint)
12491253
}
12501254

1251-
return s.repoManager.Entities().Add(
1252-
ctx,
1253-
domain.Entity{
1254-
NostrRecipient: nprofileRecipient,
1255-
},
1256-
vtxoKeys,
1257-
)
1255+
entity := domain.Entity{NostrRecipient: nprofileRecipient}
1256+
go func(entity domain.Entity, vtxoKeys []domain.VtxoKey) {
1257+
ctx := context.Background()
1258+
1259+
if err := s.repoManager.Entities().Add(ctx, entity, vtxoKeys); err != nil {
1260+
log.WithError(err).Warn("failed to add nostr identity, retrying...")
1261+
return
1262+
}
1263+
log.Debug("added new nostr identity")
1264+
}(entity, vtxoKeys)
1265+
1266+
return nil
12581267
}
12591268

12601269
func (s *covenantlessService) DeleteNostrRecipient(ctx context.Context, signedVtxoOutpoints []SignedVtxoOutpoint) error {
@@ -1267,7 +1276,28 @@ func (s *covenantlessService) DeleteNostrRecipient(ctx context.Context, signedVt
12671276
vtxoKeys = append(vtxoKeys, signedVtxo.Outpoint)
12681277
}
12691278

1270-
return s.repoManager.Entities().Delete(ctx, vtxoKeys)
1279+
fetchedKeys := make([]domain.VtxoKey, 0, len(vtxoKeys))
1280+
for _, vtxoKey := range vtxoKeys {
1281+
entities, err := s.repoManager.Entities().Get(ctx, vtxoKey)
1282+
if err != nil || len(entities) == 0 {
1283+
continue
1284+
}
1285+
fetchedKeys = append(fetchedKeys, vtxoKey)
1286+
}
1287+
if len(fetchedKeys) <= 0 {
1288+
return nil
1289+
}
1290+
1291+
go func(vtxoKeys []domain.VtxoKey) {
1292+
ctx := context.Background()
1293+
if err := s.repoManager.Entities().Delete(ctx, vtxoKeys); err != nil {
1294+
log.WithError(err).Warn("failed to delete nostr identity, retrying...")
1295+
return
1296+
}
1297+
log.Debug("deleted nostr identities")
1298+
}(fetchedKeys)
1299+
1300+
return nil
12711301
}
12721302

12731303
func (s *covenantlessService) start() {

server/internal/infrastructure/db/badger/entity_repo.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package badgerdb
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"path/filepath"
8+
"time"
79

810
"github.com/ark-network/ark/server/internal/core/domain"
911
"github.com/dgraph-io/badger/v4"
@@ -72,9 +74,16 @@ func (r *entityRepository) Add(ctx context.Context, entity domain.Entity, vtxoKe
7274
entities.Entities = append(entities.Entities, entity)
7375
}
7476

75-
err = r.store.Upsert(vtxoKey.String(), entities)
76-
if err != nil {
77-
return fmt.Errorf("failed to upsert entity: %w", err)
77+
if err := r.store.Upsert(vtxoKey.String(), entities); err != nil {
78+
if errors.Is(err, badger.ErrConflict) {
79+
attempts := 1
80+
for errors.Is(err, badger.ErrConflict) && attempts <= maxRetries {
81+
time.Sleep(100 * time.Millisecond)
82+
err = r.store.Upsert(vtxoKey.String(), entities)
83+
attempts++
84+
}
85+
}
86+
return err
7887
}
7988
}
8089
return nil
@@ -98,12 +107,19 @@ func (r *entityRepository) Delete(ctx context.Context, vtxoKeys []domain.VtxoKey
98107
}
99108

100109
for _, vtxoKey := range vtxoKeys {
101-
err := r.store.Delete(vtxoKey.String(), &entities{})
102-
if err == badgerhold.ErrNotFound {
103-
return fmt.Errorf("entities not found for key: %s", vtxoKey)
104-
}
105-
if err != nil {
106-
return fmt.Errorf("failed to delete entity: %w", err)
110+
if err := r.store.Delete(vtxoKey.String(), &entities{}); err != nil {
111+
if errors.Is(err, badgerhold.ErrNotFound) {
112+
continue
113+
}
114+
if errors.Is(err, badger.ErrConflict) {
115+
attempts := 1
116+
for errors.Is(err, badger.ErrConflict) && attempts <= maxRetries {
117+
time.Sleep(100 * time.Millisecond)
118+
err = r.store.Delete(vtxoKey.String(), &entities{})
119+
attempts++
120+
}
121+
}
122+
return err
107123
}
108124
}
109125
return nil

0 commit comments

Comments
 (0)