Skip to content

Commit 15e53a1

Browse files
committed
filter: add forwarder filter
Fixes: damus-io#10 Changelog-Added: Add forwarder filter Signed-off-by: William Casarin <[email protected]>
1 parent 794468d commit 15e53a1

File tree

8 files changed

+169
-2
lines changed

8 files changed

+169
-2
lines changed

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
tags:
22
find src -name '*.rs' | xargs ctags
3+
4+
.PHONY: tags

README.md

+12
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,18 @@ There are no config options, but an empty config entry is still needed:
131131

132132
`[filters.protected_events]`
133133

134+
### Forwarder
135+
136+
* name: `forwarder`
137+
138+
The forwarder filter allows you to forward notes to another relay. Notes will
139+
be queued if the connection goes down (up to the `queue_size` buffer limit)
140+
141+
- `relay` - the relay to forward notes to, eg: `ws://localhost:8080`
142+
143+
- `queue_size` *optional* - size of the note queue, this is used to buffer notes if the connection goes down
144+
145+
134146
## Testing
135147

136148
You can test your filters like so:

noteguard-forwarder.toml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
pipeline = ["ratelimit", "forwarder"]
3+
4+
[filters.forwarder]
5+
relay = "ws://127.0.0.1:8080"
6+
7+
[filters.ratelimit]
8+
posts_per_minute = 3

src/filters/forwarder.rs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use serde::Deserialize;
2+
use crate::{Note, Action, NoteFilter, InputMessage, OutputMessage};
3+
use futures_util::{SinkExt, StreamExt};
4+
use tokio::sync::mpsc::{self, Sender, Receiver};
5+
use tokio_tungstenite::connect_async;
6+
use tokio_tungstenite::tungstenite::Message;
7+
use tokio_tungstenite::WebSocketStream;
8+
use tokio::time::{sleep, timeout, Duration};
9+
use serde_json::json;
10+
use log::{error, info, debug};
11+
12+
#[derive(Default, Deserialize)]
13+
pub struct Forwarder {
14+
relay: String,
15+
16+
/// the size of our bounded queue
17+
queue_size: Option<u32>,
18+
19+
/// The channel used for communicating with the forwarder thread
20+
#[serde(skip)]
21+
channel: Option<Sender<Note>>,
22+
}
23+
24+
async fn client_reconnect(relay: &str) -> WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>> {
25+
loop {
26+
match connect_async(relay).await {
27+
Err(e) => {
28+
error!("failed to connect to relay {}: {}", relay, e);
29+
sleep(Duration::from_secs(5)).await;
30+
continue;
31+
}
32+
Ok((ws, _)) => {
33+
info!("connected to relay: {}", relay);
34+
return ws;
35+
}
36+
}
37+
}
38+
}
39+
40+
async fn forwarder_task(relay: String, mut rx: Receiver<Note>) {
41+
let stream = client_reconnect(&relay).await;
42+
let (mut writer, mut reader) = stream.split();
43+
44+
loop {
45+
tokio::select! {
46+
result = timeout(Duration::from_secs(10), rx.recv()) => {
47+
match result {
48+
Ok(Some(note)) => {
49+
if let Err(e) = writer.send(Message::Text(serde_json::to_string(&json!(["EVENT", note])).unwrap())).await {
50+
error!("got error: '{}', reconnecting...", e);
51+
let (w, r) = client_reconnect(&relay).await.split();
52+
writer = w;
53+
reader = r;
54+
}
55+
},
56+
Ok(None) => {
57+
// Channel has been closed, exit the loop
58+
error!("channel closed, stopping forwarder_task");
59+
break;
60+
}
61+
Err(_) => {
62+
// Timeout occurred, send a ping
63+
// try reading for pongs, etc
64+
let _r = reader.next();
65+
debug!("timeout reading note queue, sending ping");
66+
67+
if let Err(e) = writer.send(Message::Ping(vec![])).await {
68+
error!("error during ping ({}), reconnecting...", e);
69+
let (w, r) = client_reconnect(&relay).await.split();
70+
writer = w;
71+
reader = r;
72+
}
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}
79+
80+
impl NoteFilter for Forwarder {
81+
fn name(&self) -> &'static str {
82+
"forwarder"
83+
}
84+
85+
fn filter_note(&mut self, input: &InputMessage) -> OutputMessage {
86+
if self.channel.is_none() {
87+
let (tx, rx) = mpsc::channel(self.queue_size.unwrap_or(1000) as usize);
88+
let relay = self.relay.clone();
89+
90+
tokio::task::spawn(async move {
91+
forwarder_task(relay, rx).await;
92+
});
93+
94+
self.channel = Some(tx);
95+
}
96+
97+
// Add code to process input and send through channel
98+
if let Some(ref channel) = self.channel {
99+
if let Err(e) = channel.try_send(input.event.clone()) {
100+
eprintln!("could not forward note: {}", e);
101+
}
102+
}
103+
104+
// Create and return an appropriate OutputMessage
105+
OutputMessage::new(input.event.id.clone(), Action::Accept, None)
106+
}
107+
}

src/filters/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@ mod protected_events;
33
mod ratelimit;
44
mod whitelist;
55

6+
#[cfg(feature = "forwarder")]
7+
mod forwarder;
8+
69
pub use kinds::Kinds;
710
pub use protected_events::ProtectedEvents;
811
pub use ratelimit::RateLimit;
912
pub use whitelist::Whitelist;
13+
14+
#[cfg(feature = "forwarder")]
15+
pub use forwarder::Forwarder;

src/main.rs

+22
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
use noteguard::filters::{Kinds, ProtectedEvents, RateLimit, Whitelist};
2+
3+
#[cfg(feature = "forwarder")]
4+
use noteguard::filters::Forwarder;
5+
26
use noteguard::{Action, InputMessage, NoteFilter, OutputMessage};
37
use serde::de::DeserializeOwned;
48
use serde::Deserialize;
59
use std::collections::HashMap;
610
use std::io::{self, BufRead, Read, Write};
11+
use log::info;
712

813
#[derive(Deserialize)]
914
struct Config {
@@ -44,6 +49,9 @@ impl Noteguard {
4449
self.register_filter::<Whitelist>();
4550
self.register_filter::<ProtectedEvents>();
4651
self.register_filter::<Kinds>();
52+
53+
#[cfg(feature = "forwarder")]
54+
self.register_filter::<Forwarder>();
4755
}
4856

4957
/// Run the loaded filters. You must call `load_config` before calling this, otherwise
@@ -94,7 +102,21 @@ impl Noteguard {
94102
}
95103
}
96104

105+
#[cfg(feature = "forwarder")]
106+
#[tokio::main]
107+
async fn main() {
108+
noteguard();
109+
}
110+
111+
#[cfg(not(feature = "forwarder"))]
97112
fn main() {
113+
noteguard();
114+
}
115+
116+
fn noteguard() {
117+
env_logger::init();
118+
info!("running noteguard");
119+
98120
let config_path = "noteguard.toml";
99121
let mut noteguard = Noteguard::new();
100122

src/note_filter.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{InputMessage, OutputMessage};
2-
use serde::Deserialize;
2+
use serde::{Deserialize, Serialize};
33

4-
#[derive(Deserialize)]
4+
#[derive(Deserialize, Serialize, Clone)]
55
pub struct Note {
66
pub id: String,
77
pub pubkey: String,

test/delayed-nostril

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/usr/bin/env sh
2+
3+
while true
4+
do
5+
note="$(nostril --silent --content hello)"
6+
echo "{\"type\": \"new\",\"receivedAt\":12345,\"sourceType\":\"IP4\",\"sourceInfo\": \"127.0.0.2\",\"event\":$note}"
7+
8+
sleep ${1:-0.1}
9+
done
10+

0 commit comments

Comments
 (0)