Skip to content

fix(iroh-dns-server): remove accidental blocking from store #2985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions iroh-dns-server/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use anyhow::Result;
use hickory_proto::rr::{Name, RecordSet, RecordType, RrKey};
use iroh_metrics::inc;
use lru::LruCache;
use parking_lot::Mutex;
use pkarr::{mainline::dht::DhtSettings, PkarrClient, SignedPacket};
use tokio::sync::Mutex;
use tracing::{debug, trace};
use ttl_cache::TtlCache;

Expand Down Expand Up @@ -97,14 +97,15 @@ impl ZoneStore {
record_type: RecordType,
) -> Result<Option<Arc<RecordSet>>> {
tracing::info!("{} {}", name, record_type);
if let Some(rset) = self.cache.lock().resolve(pubkey, name, record_type) {
if let Some(rset) = self.cache.lock().await.resolve(pubkey, name, record_type) {
return Ok(Some(rset));
}

if let Some(packet) = self.store.get(pubkey)? {
if let Some(packet) = self.store.get(pubkey).await? {
return self
.cache
.lock()
.await
.insert_and_resolve(&packet, name, record_type);
};

Expand All @@ -120,6 +121,7 @@ impl ZoneStore {
return self
.cache
.lock()
.await
.insert_and_resolve_dht(&packet, name, record_type);
} else {
debug!("DHT resolve failed");
Expand All @@ -132,7 +134,7 @@ impl ZoneStore {
// allow unused async: this will be async soon.
#[allow(clippy::unused_async)]
pub async fn get_signed_packet(&self, pubkey: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
self.store.get(pubkey)
self.store.get(pubkey).await
}

/// Insert a signed packet into the cache and the store.
Expand All @@ -143,9 +145,9 @@ impl ZoneStore {
#[allow(clippy::unused_async)]
pub async fn insert(&self, signed_packet: SignedPacket, _source: PacketSource) -> Result<bool> {
let pubkey = PublicKeyBytes::from_signed_packet(&signed_packet);
if self.store.upsert(signed_packet)? {
if self.store.upsert(signed_packet).await? {
inc!(Metrics, pkarr_publish_update);
self.cache.lock().remove(&pubkey);
self.cache.lock().await.remove(&pubkey);
Ok(true)
} else {
inc!(Metrics, pkarr_publish_noop);
Expand Down
95 changes: 55 additions & 40 deletions iroh-dns-server/src/store/signed_packets.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::{path::Path, sync::Arc};

use anyhow::{Context, Result};
use iroh_metrics::inc;
Expand All @@ -14,7 +14,7 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> =

#[derive(Debug)]
pub struct SignedPacketStore {
db: Database,
db: Arc<Database>,
}
Copy link
Member

@matheus23 matheus23 Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redb should really implement Clone for Database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean Clone? This is how we do it with the protocols, they don't (need to) impl Clone, and people have to wrap them in an Arc... 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry, I edited my comment :D


impl SignedPacketStore {
Expand Down Expand Up @@ -47,53 +47,68 @@ impl SignedPacketStore {
let _table = write_tx.open_table(SIGNED_PACKETS_TABLE)?;
}
write_tx.commit()?;
Ok(Self { db })
Ok(Self { db: Arc::new(db) })
}

pub fn upsert(&self, packet: SignedPacket) -> Result<bool> {
pub async fn upsert(&self, packet: SignedPacket) -> Result<bool> {
let key = PublicKeyBytes::from_signed_packet(&packet);
let tx = self.db.begin_write()?;
let mut replaced = false;
{
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
if let Some(existing) = get_packet(&table, &key)? {
if existing.more_recent_than(&packet) {
return Ok(false);
} else {
replaced = true;
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let tx = db.begin_write()?;
let mut replaced = false;
{
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
if let Some(existing) = get_packet(&table, &key)? {
if existing.more_recent_than(&packet) {
return Ok(false);
} else {
replaced = true;
}
}
let value = packet.as_bytes();
table.insert(key.as_bytes(), &value[..])?;
}
let value = packet.as_bytes();
table.insert(key.as_bytes(), &value[..])?;
}
tx.commit()?;
if replaced {
inc!(Metrics, store_packets_updated);
} else {
inc!(Metrics, store_packets_inserted);
}
Ok(true)
tx.commit()?;
if replaced {
inc!(Metrics, store_packets_updated);
} else {
inc!(Metrics, store_packets_inserted);
}
Ok(true)
})
.await?
}

pub fn get(&self, key: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
let tx = self.db.begin_read()?;
let table = tx.open_table(SIGNED_PACKETS_TABLE)?;
get_packet(&table, key)
pub async fn get(&self, key: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
let db = self.db.clone();
let key = *key;
let res = tokio::task::spawn_blocking(move || {
let tx = db.begin_read()?;
let table = tx.open_table(SIGNED_PACKETS_TABLE)?;
get_packet(&table, &key)
})
.await??;
Ok(res)
}

pub fn remove(&self, key: &PublicKeyBytes) -> Result<bool> {
let tx = self.db.begin_write()?;
let updated = {
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
let did_remove = table.remove(key.as_bytes())?.is_some();
#[allow(clippy::let_and_return)]
did_remove
};
tx.commit()?;
if updated {
inc!(Metrics, store_packets_removed)
}
Ok(updated)
pub async fn remove(&self, key: &PublicKeyBytes) -> Result<bool> {
let db = self.db.clone();
let key = *key;
tokio::task::spawn_blocking(move || {
let tx = db.begin_write()?;
let updated = {
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
let did_remove = table.remove(key.as_bytes())?.is_some();
#[allow(clippy::let_and_return)]
did_remove
};
tx.commit()?;
if updated {
inc!(Metrics, store_packets_removed)
}
Ok(updated)
})
.await?
}
}

Expand Down
Loading