Skip to content

Commit cd9c188

Browse files
rklaehndignifiedquireArqu
authored
refactor(iroh-dns-server): Move db ops into an actor and implement write batching (#2995)
## Description Batch to make write go brrr.... - move all db interactions into an actor - inside the actor, have two nested loops, inner loop keeps write txn open - txn is closed when a number of msgs have been written or some time has elapsed, whatever comes first ## Breaking Changes None ## Benches This branch: ``` Benchmarking dns_server_writes/1000: Collecting 10 samples in estimated 9.4698 s dns_server_writes/1000 time: [171.28 ms 174.05 ms 176.58 ms] thrpt: [5.6631 Kelem/s 5.7455 Kelem/s 5.8384 Kelem/s] change: time: [-6.8425% -3.3112% -0.0384%] (p = 0.10 > 0.05) thrpt: [+0.0384% +3.4246% +7.3451%] No change in performance detected. ``` arqu/dns-bench: ``` Benchmarking dns_server_writes/1000: Warming up for 3.0000 s dns_server_writes/1000 time: [6.2530 s 6.3920 s 6.5351 s] thrpt: [153.02 elem/s 156.45 elem/s 159.92 elem/s] change: time: [+3881.0% +3974.8% +4080.8%] (p = 0.00 < 0.05) thrpt: [-97.608% -97.546% -97.488%] Performance has regressed. ``` ## Downsides The downside of write batching is that in case of a crash or hard program termination we lose the last MAX_BATCH_TIME seconds and MAX_BATCH_SIZE writes. I think that this is acceptable since discovery information is republished. --------- Co-authored-by: dignifiedquire <[email protected]> Co-authored-by: Asmir Avdicevic <[email protected]>
1 parent 26c5248 commit cd9c188

File tree

5 files changed

+224
-59
lines changed

5 files changed

+224
-59
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-dns-server/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ rustls-pemfile = { version = "2.1" }
4141
serde = { version = "1", features = ["derive"] }
4242
struct_iterable = "0.1.1"
4343
strum = { version = "0.26", features = ["derive"] }
44-
tokio = { version = "1.36.0", features = ["full"] }
44+
tokio = { version = "1", features = ["full"] }
4545
tokio-rustls = { version = "0.26", default-features = false, features = [
4646
"logging",
4747
"ring",
@@ -59,10 +59,15 @@ url = "2.5"
5959
z32 = "1.1.1"
6060

6161
[dev-dependencies]
62+
criterion = "0.5.1"
6263
hickory-resolver = "=0.25.0-alpha.2"
6364
iroh = { version = "0.29.0", path = "../iroh" }
6465
iroh-test = { version = "0.29.0", path = "../iroh-test" }
6566
pkarr = { version = "2.2.0", features = ["rand"] }
6667

68+
[[bench]]
69+
name = "write"
70+
harness = false
71+
6772
[package.metadata.docs.rs]
6873
all-features = true

iroh-dns-server/benches/write.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use anyhow::Result;
2+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
3+
use iroh::{discovery::pkarr::PkarrRelayClient, dns::node_info::NodeInfo, key::SecretKey};
4+
use iroh_dns_server::{config::Config, server::Server, ZoneStore};
5+
use tokio::runtime::Runtime;
6+
7+
const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr";
8+
9+
async fn start_dns_server(config: Config) -> Result<Server> {
10+
let store = ZoneStore::persistent(Config::signed_packet_store_path()?)?;
11+
Server::spawn(config, store).await
12+
}
13+
14+
fn benchmark_dns_server(c: &mut Criterion) {
15+
let mut group = c.benchmark_group("dns_server_writes");
16+
group.sample_size(10);
17+
for iters in [10_u64, 100_u64, 250_u64, 1000_u64].iter() {
18+
group.throughput(Throughput::Elements(*iters));
19+
group.bench_with_input(BenchmarkId::from_parameter(iters), iters, |b, &iters| {
20+
b.iter(|| {
21+
let rt = Runtime::new().unwrap();
22+
rt.block_on(async move {
23+
let config = Config::load("./config.dev.toml").await.unwrap();
24+
let server = start_dns_server(config).await.unwrap();
25+
26+
let secret_key = SecretKey::generate();
27+
let node_id = secret_key.public();
28+
29+
let pkarr_relay = LOCALHOST_PKARR.parse().expect("valid url");
30+
let relay_url = Some("http://localhost:8080".parse().unwrap());
31+
let pkarr = PkarrRelayClient::new(pkarr_relay);
32+
let node_info = NodeInfo::new(node_id, relay_url, Default::default());
33+
let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30).unwrap();
34+
35+
let start = std::time::Instant::now();
36+
for _ in 0..iters {
37+
pkarr.publish(&signed_packet).await.unwrap();
38+
}
39+
let duration = start.elapsed();
40+
41+
server.shutdown().await.unwrap();
42+
43+
duration
44+
})
45+
});
46+
});
47+
}
48+
}
49+
50+
criterion_group!(benches, benchmark_dns_server);
51+
criterion_main!(benches);

iroh-dns-server/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ pub mod state;
1111
mod store;
1212
mod util;
1313

14+
// Re-export to be able to construct your own dns-server
15+
pub use store::ZoneStore;
16+
1417
#[cfg(test)]
1518
mod tests {
1619
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};

iroh-dns-server/src/store/signed_packets.rs

Lines changed: 163 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,145 @@
1-
use std::{path::Path, sync::Arc};
1+
use std::{path::Path, result, time::Duration};
22

33
use anyhow::{Context, Result};
44
use iroh_metrics::inc;
55
use pkarr::SignedPacket;
66
use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition};
7+
use tokio::sync::{mpsc, oneshot};
8+
use tokio_util::sync::CancellationToken;
79
use tracing::info;
810

911
use crate::{metrics::Metrics, util::PublicKeyBytes};
1012

1113
pub type SignedPacketsKey = [u8; 32];
1214
const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> =
1315
TableDefinition::new("signed-packets-1");
16+
const MAX_BATCH_SIZE: usize = 1024 * 64;
17+
const MAX_BATCH_TIME: Duration = Duration::from_secs(1);
1418

1519
#[derive(Debug)]
1620
pub struct SignedPacketStore {
17-
db: Arc<Database>,
21+
send: mpsc::Sender<Message>,
22+
cancel: CancellationToken,
23+
thread: Option<std::thread::JoinHandle<()>>,
24+
}
25+
26+
impl Drop for SignedPacketStore {
27+
fn drop(&mut self) {
28+
// cancel the actor
29+
self.cancel.cancel();
30+
// join the thread. This is important so that Drop implementations that
31+
// are called from the actor thread can complete before we return.
32+
if let Some(thread) = self.thread.take() {
33+
let _ = thread.join();
34+
}
35+
}
36+
}
37+
38+
enum Message {
39+
Upsert {
40+
packet: SignedPacket,
41+
res: oneshot::Sender<bool>,
42+
},
43+
Get {
44+
key: PublicKeyBytes,
45+
res: oneshot::Sender<Option<SignedPacket>>,
46+
},
47+
Remove {
48+
key: PublicKeyBytes,
49+
res: oneshot::Sender<bool>,
50+
},
51+
}
52+
53+
struct Actor {
54+
db: Database,
55+
recv: mpsc::Receiver<Message>,
56+
cancel: CancellationToken,
57+
max_batch_size: usize,
58+
max_batch_time: Duration,
59+
}
60+
61+
impl Actor {
62+
async fn run(mut self) {
63+
match self.run0().await {
64+
Ok(()) => {}
65+
Err(e) => {
66+
self.cancel.cancel();
67+
tracing::error!("packet store actor failed: {:?}", e);
68+
}
69+
}
70+
}
71+
72+
async fn run0(&mut self) -> anyhow::Result<()> {
73+
loop {
74+
let transaction = self.db.begin_write()?;
75+
let mut tables = Tables::new(&transaction)?;
76+
let timeout = tokio::time::sleep(self.max_batch_time);
77+
tokio::pin!(timeout);
78+
for _ in 0..self.max_batch_size {
79+
tokio::select! {
80+
_ = self.cancel.cancelled() => {
81+
drop(tables);
82+
transaction.commit()?;
83+
return Ok(());
84+
}
85+
_ = &mut timeout => break,
86+
Some(msg) = self.recv.recv() => {
87+
match msg {
88+
Message::Get { key, res } => {
89+
let packet = get_packet(&tables.signed_packets, &key)?;
90+
res.send(packet).ok();
91+
}
92+
Message::Upsert { packet, res } => {
93+
let key = PublicKeyBytes::from_signed_packet(&packet);
94+
let mut replaced = false;
95+
if let Some(existing) = get_packet(&tables.signed_packets, &key)? {
96+
if existing.more_recent_than(&packet) {
97+
res.send(false).ok();
98+
continue;
99+
} else {
100+
replaced = true;
101+
}
102+
}
103+
let value = packet.as_bytes();
104+
tables.signed_packets.insert(key.as_bytes(), &value[..])?;
105+
if replaced {
106+
inc!(Metrics, store_packets_updated);
107+
} else {
108+
inc!(Metrics, store_packets_inserted);
109+
}
110+
res.send(true).ok();
111+
}
112+
Message::Remove { key, res } => {
113+
let updated =
114+
tables.signed_packets.remove(key.as_bytes())?.is_some()
115+
;
116+
if updated {
117+
inc!(Metrics, store_packets_removed);
118+
}
119+
res.send(updated).ok();
120+
}
121+
}
122+
}
123+
}
124+
}
125+
drop(tables);
126+
transaction.commit()?;
127+
}
128+
}
129+
}
130+
131+
/// A struct similar to [`redb::Table`] but for all tables that make up the
132+
/// signed packet store.
133+
pub(super) struct Tables<'a> {
134+
pub signed_packets: redb::Table<'a, &'static SignedPacketsKey, &'static [u8]>,
135+
}
136+
137+
impl<'txn> Tables<'txn> {
138+
pub fn new(tx: &'txn redb::WriteTransaction) -> result::Result<Self, redb::TableError> {
139+
Ok(Self {
140+
signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?,
141+
})
142+
}
18143
}
19144

20145
impl SignedPacketStore {
@@ -42,73 +167,53 @@ impl SignedPacketStore {
42167
}
43168

44169
pub fn open(db: Database) -> Result<Self> {
170+
// create tables
45171
let write_tx = db.begin_write()?;
46-
{
47-
let _table = write_tx.open_table(SIGNED_PACKETS_TABLE)?;
48-
}
172+
let _ = Tables::new(&write_tx)?;
49173
write_tx.commit()?;
50-
Ok(Self { db: Arc::new(db) })
174+
let (send, recv) = mpsc::channel(1024);
175+
let cancel = CancellationToken::new();
176+
let cancel2 = cancel.clone();
177+
let actor = Actor {
178+
db,
179+
recv,
180+
cancel: cancel2,
181+
max_batch_size: MAX_BATCH_SIZE,
182+
max_batch_time: MAX_BATCH_TIME,
183+
};
184+
// start an io thread and donate it to the tokio runtime so we can do blocking IO
185+
// inside the thread despite being in a tokio runtime
186+
let handle = tokio::runtime::Handle::try_current()?;
187+
let thread = std::thread::Builder::new()
188+
.name("packet-store-actor".into())
189+
.spawn(move || {
190+
handle.block_on(actor.run());
191+
})?;
192+
Ok(Self {
193+
send,
194+
cancel,
195+
thread: Some(thread),
196+
})
51197
}
52198

53199
pub async fn upsert(&self, packet: SignedPacket) -> Result<bool> {
54-
let key = PublicKeyBytes::from_signed_packet(&packet);
55-
let db = self.db.clone();
56-
tokio::task::spawn_blocking(move || {
57-
let tx = db.begin_write()?;
58-
let mut replaced = false;
59-
{
60-
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
61-
if let Some(existing) = get_packet(&table, &key)? {
62-
if existing.more_recent_than(&packet) {
63-
return Ok(false);
64-
} else {
65-
replaced = true;
66-
}
67-
}
68-
let value = packet.as_bytes();
69-
table.insert(key.as_bytes(), &value[..])?;
70-
}
71-
tx.commit()?;
72-
if replaced {
73-
inc!(Metrics, store_packets_updated);
74-
} else {
75-
inc!(Metrics, store_packets_inserted);
76-
}
77-
Ok(true)
78-
})
79-
.await?
200+
let (tx, rx) = oneshot::channel();
201+
self.send.send(Message::Upsert { packet, res: tx }).await?;
202+
Ok(rx.await?)
80203
}
81204

82205
pub async fn get(&self, key: &PublicKeyBytes) -> Result<Option<SignedPacket>> {
83-
let db = self.db.clone();
84-
let key = *key;
85-
let res = tokio::task::spawn_blocking(move || {
86-
let tx = db.begin_read()?;
87-
let table = tx.open_table(SIGNED_PACKETS_TABLE)?;
88-
get_packet(&table, &key)
89-
})
90-
.await??;
91-
Ok(res)
206+
let (tx, rx) = oneshot::channel();
207+
self.send.send(Message::Get { key: *key, res: tx }).await?;
208+
Ok(rx.await?)
92209
}
93210

94211
pub async fn remove(&self, key: &PublicKeyBytes) -> Result<bool> {
95-
let db = self.db.clone();
96-
let key = *key;
97-
tokio::task::spawn_blocking(move || {
98-
let tx = db.begin_write()?;
99-
let updated = {
100-
let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?;
101-
let did_remove = table.remove(key.as_bytes())?.is_some();
102-
#[allow(clippy::let_and_return)]
103-
did_remove
104-
};
105-
tx.commit()?;
106-
if updated {
107-
inc!(Metrics, store_packets_removed)
108-
}
109-
Ok(updated)
110-
})
111-
.await?
212+
let (tx, rx) = oneshot::channel();
213+
self.send
214+
.send(Message::Remove { key: *key, res: tx })
215+
.await?;
216+
Ok(rx.await?)
112217
}
113218
}
114219

0 commit comments

Comments
 (0)