1
- use std:: path:: Path ;
1
+ use std:: { path:: Path , sync :: Arc } ;
2
2
3
3
use anyhow:: { Context , Result } ;
4
4
use iroh_metrics:: inc;
@@ -14,7 +14,7 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> =
14
14
15
15
#[ derive( Debug ) ]
16
16
pub struct SignedPacketStore {
17
- db : Database ,
17
+ db : Arc < Database > ,
18
18
}
19
19
20
20
impl SignedPacketStore {
@@ -47,53 +47,68 @@ impl SignedPacketStore {
47
47
let _table = write_tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
48
48
}
49
49
write_tx. commit ( ) ?;
50
- Ok ( Self { db } )
50
+ Ok ( Self { db : Arc :: new ( db ) } )
51
51
}
52
52
53
- pub fn upsert ( & self , packet : SignedPacket ) -> Result < bool > {
53
+ pub async fn upsert ( & self , packet : SignedPacket ) -> Result < bool > {
54
54
let key = PublicKeyBytes :: from_signed_packet ( & packet) ;
55
- let tx = self . db . begin_write ( ) ?;
56
- let mut replaced = false ;
57
- {
58
- let mut table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
59
- if let Some ( existing) = get_packet ( & table, & key) ? {
60
- if existing. more_recent_than ( & packet) {
61
- return Ok ( false ) ;
62
- } else {
63
- replaced = true ;
55
+ let db = self . db . clone ( ) ;
56
+ tokio:: task:: spawn_blocking ( move || {
57
+ let tx = db. begin_write ( ) ?;
58
+ let mut replaced = false ;
59
+ {
60
+ let mut table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
61
+ if let Some ( existing) = get_packet ( & table, & key) ? {
62
+ if existing. more_recent_than ( & packet) {
63
+ return Ok ( false ) ;
64
+ } else {
65
+ replaced = true ;
66
+ }
64
67
}
68
+ let value = packet. as_bytes ( ) ;
69
+ table. insert ( key. as_bytes ( ) , & value[ ..] ) ?;
65
70
}
66
- let value = packet. as_bytes ( ) ;
67
- table. insert ( key. as_bytes ( ) , & value[ ..] ) ?;
68
- }
69
- tx. commit ( ) ?;
70
- if replaced {
71
- inc ! ( Metrics , store_packets_updated) ;
72
- } else {
73
- inc ! ( Metrics , store_packets_inserted) ;
74
- }
75
- Ok ( true )
71
+ tx. commit ( ) ?;
72
+ if replaced {
73
+ inc ! ( Metrics , store_packets_updated) ;
74
+ } else {
75
+ inc ! ( Metrics , store_packets_inserted) ;
76
+ }
77
+ Ok ( true )
78
+ } )
79
+ . await ?
76
80
}
77
81
78
- pub fn get ( & self , key : & PublicKeyBytes ) -> Result < Option < SignedPacket > > {
79
- let tx = self . db . begin_read ( ) ?;
80
- let table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
81
- get_packet ( & table, key)
82
+ pub async fn get ( & self , key : & PublicKeyBytes ) -> Result < Option < SignedPacket > > {
83
+ let db = self . db . clone ( ) ;
84
+ let key = key. clone ( ) ;
85
+ let res = tokio:: task:: spawn_blocking ( move || {
86
+ let tx = db. begin_read ( ) ?;
87
+ let table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
88
+ get_packet ( & table, & key)
89
+ } )
90
+ . await ??;
91
+ Ok ( res)
82
92
}
83
93
84
- pub fn remove ( & self , key : & PublicKeyBytes ) -> Result < bool > {
85
- let tx = self . db . begin_write ( ) ?;
86
- let updated = {
87
- let mut table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
88
- let did_remove = table. remove ( key. as_bytes ( ) ) ?. is_some ( ) ;
89
- #[ allow( clippy:: let_and_return) ]
90
- did_remove
91
- } ;
92
- tx. commit ( ) ?;
93
- if updated {
94
- inc ! ( Metrics , store_packets_removed)
95
- }
96
- Ok ( updated)
94
+ pub async fn remove ( & self , key : & PublicKeyBytes ) -> Result < bool > {
95
+ let db = self . db . clone ( ) ;
96
+ let key = key. clone ( ) ;
97
+ tokio:: task:: spawn_blocking ( move || {
98
+ let tx = db. begin_write ( ) ?;
99
+ let updated = {
100
+ let mut table = tx. open_table ( SIGNED_PACKETS_TABLE ) ?;
101
+ let did_remove = table. remove ( key. as_bytes ( ) ) ?. is_some ( ) ;
102
+ #[ allow( clippy:: let_and_return) ]
103
+ did_remove
104
+ } ;
105
+ tx. commit ( ) ?;
106
+ if updated {
107
+ inc ! ( Metrics , store_packets_removed)
108
+ }
109
+ Ok ( updated)
110
+ } )
111
+ . await ?
97
112
}
98
113
}
99
114
0 commit comments