Skip to content

Commit 647b2fd

Browse files
fix(iroh-dns-server): remove accidental blocking from store (#2985)
## Description I found this during investigations of #2972. It turned out that both blocking locks and blocking IO calls are being made in the dns servers store implementation ## Breaking Changes None ## Notes & open questions This might be not optimal, but it is the safest way to get rid of blocking the whole runtime for now. ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent 80a40c0 commit 647b2fd

File tree

2 files changed

+63
-46
lines changed

2 files changed

+63
-46
lines changed

iroh-dns-server/src/store.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use anyhow::Result;
66
use hickory_proto::rr::{Name, RecordSet, RecordType, RrKey};
77
use iroh_metrics::inc;
88
use lru::LruCache;
9-
use parking_lot::Mutex;
109
use pkarr::{mainline::dht::DhtSettings, PkarrClient, SignedPacket};
10+
use tokio::sync::Mutex;
1111
use tracing::{debug, trace};
1212
use ttl_cache::TtlCache;
1313

@@ -97,14 +97,15 @@ impl ZoneStore {
9797
record_type: RecordType,
9898
) -> Result<Option<Arc<RecordSet>>> {
9999
tracing::info!("{} {}", name, record_type);
100-
if let Some(rset) = self.cache.lock().resolve(pubkey, name, record_type) {
100+
if let Some(rset) = self.cache.lock().await.resolve(pubkey, name, record_type) {
101101
return Ok(Some(rset));
102102
}
103103

104-
if let Some(packet) = self.store.get(pubkey)? {
104+
if let Some(packet) = self.store.get(pubkey).await? {
105105
return self
106106
.cache
107107
.lock()
108+
.await
108109
.insert_and_resolve(&packet, name, record_type);
109110
};
110111

@@ -120,6 +121,7 @@ impl ZoneStore {
120121
return self
121122
.cache
122123
.lock()
124+
.await
123125
.insert_and_resolve_dht(&packet, name, record_type);
124126
} else {
125127
debug!("DHT resolve failed");
@@ -132,7 +134,7 @@ impl ZoneStore {
132134
// allow unused async: this will be async soon.
133135
#[allow(clippy::unused_async)]
134136
pub async fn get_signed_packet(&self, pubkey: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
135-
self.store.get(pubkey)
137+
self.store.get(pubkey).await
136138
}
137139

138140
/// Insert a signed packet into the cache and the store.
@@ -143,9 +145,9 @@ impl ZoneStore {
143145
#[allow(clippy::unused_async)]
144146
pub async fn insert(&self, signed_packet: SignedPacket, _source: PacketSource) -> Result<bool> {
145147
let pubkey = PublicKeyBytes::from_signed_packet(&signed_packet);
146-
if self.store.upsert(signed_packet)? {
148+
if self.store.upsert(signed_packet).await? {
147149
inc!(Metrics, pkarr_publish_update);
148-
self.cache.lock().remove(&pubkey);
150+
self.cache.lock().await.remove(&pubkey);
149151
Ok(true)
150152
} else {
151153
inc!(Metrics, pkarr_publish_noop);

iroh-dns-server/src/store/signed_packets.rs

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::path::Path;
1+
use std::{path::Path, sync::Arc};
22

33
use anyhow::{Context, Result};
44
use iroh_metrics::inc;
@@ -14,7 +14,7 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> =
1414

1515
#[derive(Debug)]
1616
pub struct SignedPacketStore {
17-
db: Database,
17+
db: Arc<Database>,
1818
}
1919

2020
impl SignedPacketStore {
@@ -47,53 +47,68 @@ impl SignedPacketStore {
4747
let _table = write_tx.open_table(SIGNED_PACKETS_TABLE)?;
4848
}
4949
write_tx.commit()?;
50-
Ok(Self { db })
50+
Ok(Self { db: Arc::new(db) })
5151
}
5252

53-
pub fn upsert(&self, packet: SignedPacket) -> Result<bool> {
53+
pub async fn upsert(&self, packet: SignedPacket) -> Result<bool> {
5454
let key = PublicKeyBytes::from_signed_packet(&packet);
55-
let tx = self.db.begin_write()?;
56-
let mut replaced = false;
57-
{
58-
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
59-
if let Some(existing) = get_packet(&table, &key)? {
60-
if existing.more_recent_than(&packet) {
61-
return Ok(false);
62-
} else {
63-
replaced = true;
55+
let db = self.db.clone();
56+
tokio::task::spawn_blocking(move || {
57+
let tx = db.begin_write()?;
58+
let mut replaced = false;
59+
{
60+
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
61+
if let Some(existing) = get_packet(&table, &key)? {
62+
if existing.more_recent_than(&packet) {
63+
return Ok(false);
64+
} else {
65+
replaced = true;
66+
}
6467
}
68+
let value = packet.as_bytes();
69+
table.insert(key.as_bytes(), &value[..])?;
6570
}
66-
let value = packet.as_bytes();
67-
table.insert(key.as_bytes(), &value[..])?;
68-
}
69-
tx.commit()?;
70-
if replaced {
71-
inc!(Metrics, store_packets_updated);
72-
} else {
73-
inc!(Metrics, store_packets_inserted);
74-
}
75-
Ok(true)
71+
tx.commit()?;
72+
if replaced {
73+
inc!(Metrics, store_packets_updated);
74+
} else {
75+
inc!(Metrics, store_packets_inserted);
76+
}
77+
Ok(true)
78+
})
79+
.await?
7680
}
7781

78-
pub fn get(&self, key: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
79-
let tx = self.db.begin_read()?;
80-
let table = tx.open_table(SIGNED_PACKETS_TABLE)?;
81-
get_packet(&table, key)
82+
pub async fn get(&self, key: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
83+
let db = self.db.clone();
84+
let key = *key;
85+
let res = tokio::task::spawn_blocking(move || {
86+
let tx = db.begin_read()?;
87+
let table = tx.open_table(SIGNED_PACKETS_TABLE)?;
88+
get_packet(&table, &key)
89+
})
90+
.await??;
91+
Ok(res)
8292
}
8393

84-
pub fn remove(&self, key: &PublicKeyBytes) -> Result<bool> {
85-
let tx = self.db.begin_write()?;
86-
let updated = {
87-
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
88-
let did_remove = table.remove(key.as_bytes())?.is_some();
89-
#[allow(clippy::let_and_return)]
90-
did_remove
91-
};
92-
tx.commit()?;
93-
if updated {
94-
inc!(Metrics, store_packets_removed)
95-
}
96-
Ok(updated)
94+
pub async fn remove(&self, key: &PublicKeyBytes) -> Result<bool> {
95+
let db = self.db.clone();
96+
let key = *key;
97+
tokio::task::spawn_blocking(move || {
98+
let tx = db.begin_write()?;
99+
let updated = {
100+
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
101+
let did_remove = table.remove(key.as_bytes())?.is_some();
102+
#[allow(clippy::let_and_return)]
103+
did_remove
104+
};
105+
tx.commit()?;
106+
if updated {
107+
inc!(Metrics, store_packets_removed)
108+
}
109+
Ok(updated)
110+
})
111+
.await?
97112
}
98113
}
99114

0 commit comments

Comments
 (0)