Skip to content

Commit 8c7d298

Browse files
authored
Adds Redis Pub/Sub integration (#80)
dds the ability to connect to Redis, subscribe to messages and publish them as well. See here for example usage. The API, prefixed with rustg_redis_: connect(addr) - Connects to a Redis instance using the given address, for example redis://127.0.0.1/. Returns an empty string on success, returns the error otherwise. disconnect() - Closes the connection to Redis and stops the thread managing it. Call this before restarting or attempting to reconnect after an error. subscribe(channel) - Subscribes to a given channel and starts receiving messages from it. get_messages() - Returns all received messages as a JSON string, in the format of {"channel_1": ["msg1", "msg2", "msg3", ...], "channel_2": ["msg1", "msg2", "msg3", ...]}. Also includes errors, which appear on the channel "RUSTG_REDIS_ERROR_CHANNEL". publish(channel, message) - Publishes a message on the given channel. Remember to check the error channel every time you call get_messages(). If any occur, you need to call disconnect(), then connect() again, then resubscribe to desired channels.
1 parent 097313d commit 8c7d298

File tree

5 files changed

+188
-2
lines changed

5 files changed

+188
-2
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ png = { version = "0.16", optional = true }
3232
image = { version = "0.23.10", optional = true }
3333
git2 = { version = "0.13", optional = true, default-features = false }
3434
noise = { version = "0.7", optional = true}
35+
redis = { version = "0.21.4", optional = true }
3536
reqwest = { version = "0.11", optional = true, default-features = false, features = ["blocking", "rustls-tls"] }
3637
serde = { version = "1.0", optional = true, features = ["derive"] }
3738
serde_json = { version = "1.0", optional = true }
@@ -41,7 +42,7 @@ mysql = { version = "20.0", optional = true }
4142
dashmap = { version = "4.0", optional = true }
4243
zip = { version = "0.5.8", optional = true }
4344
rand = {version = "0.8", optional = true}
44-
dmsort = {version = "1.0.0", optional = true}
45+
dmsort = {version = "1.0.0", optional = true }
4546
toml-dep = { version = "0.5.8", package="toml", optional = true }
4647
aho-corasick = { version = "0.7.18", optional = true}
4748

@@ -64,6 +65,7 @@ url = ["url-dep", "percent-encoding"]
6465

6566
# non-default features
6667
hash = ["base64", "const-random", "md-5", "hex", "sha-1", "sha2", "twox-hash", "serde", "serde_json"]
68+
redis_pubsub = ["flume", "redis", "serde", "serde_json"]
6769
unzip = ["zip", "jobs"]
6870
worleynoise = ["rand","dmsort"]
6971

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,10 @@ The default features are:
9494

9595
Additional features are:
9696
* hash: Faster replacement for `md5`, support for SHA-1, SHA-256, and SHA-512. Requires OpenSSL on Linux.
97+
* redis_pubsub: Library for sending and receiving messages through Redis.
9798
* url: Faster replacements for `url_encode` and `url_decode`.
9899
* unzip: Function to download a .zip from a URL and unzip it to a directory.
99-
* worleynoise: Function that generates a type of nice looking cellular noise, more expensive than cellularnoise
100+
* worleynoise: Function that generates a type of nice looking cellular noise, more expensive than cellularnoise.
100101

101102
## Installing
102103

dmsrc/redis_pubsub.dm

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#define RUSTG_REDIS_ERROR_CHANNEL "RUSTG_REDIS_ERROR_CHANNEL"
2+
3+
#define rustg_redis_connect(addr) call(RUST_G, "redis_connect")(addr)
4+
/proc/rustg_redis_disconnect() return call(RUST_G, "redis_disconnect")()
5+
#define rustg_redis_subscribe(channel) call(RUST_G, "redis_subscribe")(channel)
6+
/proc/rustg_redis_get_messages() return call(RUST_G, "redis_get_messages")()
7+
#define rustg_redis_publish(channel, message) call(RUST_G, "redis_publish")(channel, message)

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub mod json;
2828
pub mod log;
2929
#[cfg(feature = "noise")]
3030
pub mod noise_gen;
31+
#[cfg(feature = "redis_pubsub")]
32+
pub mod redis_pubsub;
3133
#[cfg(feature = "sql")]
3234
pub mod sql;
3335
#[cfg(feature = "time")]

src/redis_pubsub.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use redis::{Client, Commands, RedisError};
2+
use std::cell::RefCell;
3+
use std::collections::HashMap;
4+
use std::thread;
5+
use std::time::Duration;
6+
7+
static ERROR_CHANNEL: &'static str = "RUSTG_REDIS_ERROR_CHANNEL";
8+
9+
thread_local! {
10+
static REQUEST_SENDER: RefCell<Option<flume::Sender<PubSubRequest>>> = RefCell::new(None);
11+
static RESPONSE_RECEIVER: RefCell<Option<flume::Receiver<PubSubResponse>>> = RefCell::new(None);
12+
}
13+
14+
enum PubSubRequest {
15+
Subscribe(String),
16+
Publish(String, String),
17+
}
18+
19+
// response might not be a good name, since those are not sent in response to requests
20+
enum PubSubResponse {
21+
Disconnected(String),
22+
Message(String, String),
23+
}
24+
25+
fn handle_redis_inner(
26+
client: Client,
27+
control: &flume::Receiver<PubSubRequest>,
28+
out: &flume::Sender<PubSubResponse>,
29+
) -> Result<(), RedisError> {
30+
let mut conn = client.get_connection()?;
31+
let mut pub_conn = client.get_connection()?;
32+
let mut pubsub = conn.as_pubsub();
33+
pubsub.set_read_timeout(Some(Duration::from_secs(1)))?;
34+
35+
loop {
36+
loop {
37+
match control.try_recv() {
38+
Ok(req) => match req {
39+
PubSubRequest::Subscribe(channel) => {
40+
pubsub.subscribe(&[channel.as_str()])?;
41+
}
42+
PubSubRequest::Publish(channel, message) => {
43+
pub_conn.publish(&channel, &message)?;
44+
}
45+
},
46+
Err(flume::TryRecvError::Empty) => break,
47+
Err(flume::TryRecvError::Disconnected) => return Ok(()),
48+
}
49+
}
50+
51+
if let Some(msg) = match pubsub.get_message() {
52+
Ok(msg) => Some(msg),
53+
Err(e) => {
54+
if e.is_timeout() {
55+
None
56+
} else {
57+
return Err(e);
58+
}
59+
}
60+
} {
61+
let chan = msg.get_channel_name().to_owned();
62+
let data: String = msg.get_payload().unwrap_or_default();
63+
if let Err(flume::TrySendError::Disconnected(_)) =
64+
out.try_send(PubSubResponse::Message(chan, data))
65+
{
66+
return Ok(()); // If no one wants to receive any more messages from us, we exit this thread
67+
}
68+
}
69+
}
70+
}
71+
72+
fn handle_redis(
73+
client: Client,
74+
control: flume::Receiver<PubSubRequest>,
75+
out: flume::Sender<PubSubResponse>,
76+
) {
77+
if let Err(e) = handle_redis_inner(client, &control, &out) {
78+
let _ = out.send(PubSubResponse::Disconnected(e.to_string()));
79+
}
80+
}
81+
82+
fn connect(addr: &str) -> Result<(), RedisError> {
83+
let client = redis::Client::open(addr)?;
84+
let _ = client.get_connection_with_timeout(Duration::from_secs(1))?;
85+
let (c_sender, c_receiver) = flume::bounded(1000);
86+
let (o_sender, o_receiver) = flume::bounded(1000);
87+
REQUEST_SENDER.with(|cell| cell.replace(Some(c_sender)));
88+
RESPONSE_RECEIVER.with(|cell| cell.replace(Some(o_receiver)));
89+
thread::spawn(|| handle_redis(client, c_receiver, o_sender));
90+
Ok(())
91+
}
92+
93+
fn disconnect() {
94+
// Dropping the sender and receiver will cause the other thread to exit
95+
REQUEST_SENDER.with(|cell| {
96+
cell.replace(None);
97+
});
98+
RESPONSE_RECEIVER.with(|cell| {
99+
cell.replace(None);
100+
});
101+
}
102+
103+
// It's lame as hell to use strings as errors, but I don't feel like
104+
// making a whole new type encompassing possible errors, since we're returning a string
105+
// to BYOND anyway.
106+
fn subscribe(channel: &str) -> Option<String> {
107+
return REQUEST_SENDER.with(|cell| {
108+
if let Some(chan) = cell.borrow_mut().as_ref() {
109+
return chan
110+
.try_send(PubSubRequest::Subscribe(channel.to_owned()))
111+
.err()
112+
.map(|e| e.to_string());
113+
};
114+
Some("Not connected".to_owned())
115+
});
116+
}
117+
118+
fn publish(channel: &str, msg: &str) -> Option<String> {
119+
return REQUEST_SENDER.with(|cell| {
120+
if let Some(chan) = cell.borrow_mut().as_ref() {
121+
return chan
122+
.try_send(PubSubRequest::Publish(channel.to_owned(), msg.to_owned()))
123+
.err()
124+
.map(|e| e.to_string());
125+
};
126+
Some("Not connected".to_owned())
127+
});
128+
}
129+
130+
fn get_messages() -> String {
131+
let mut result: HashMap<String, Vec<String>> = HashMap::new();
132+
RESPONSE_RECEIVER.with(|cell| {
133+
let opt = cell.borrow_mut();
134+
if let Some(recv) = opt.as_ref() {
135+
for resp in recv.try_iter() {
136+
match resp {
137+
PubSubResponse::Message(chan, msg) => {
138+
result.entry(chan).or_default().push(msg);
139+
}
140+
PubSubResponse::Disconnected(error) => {
141+
// Pardon the in-band signaling but it's probably the best way to do this
142+
result
143+
.entry(ERROR_CHANNEL.to_owned())
144+
.or_default()
145+
.push(error);
146+
}
147+
}
148+
}
149+
}
150+
});
151+
152+
serde_json::to_string(&result).unwrap_or("{}".to_owned())
153+
}
154+
155+
byond_fn! { redis_connect(addr) {
156+
connect(addr).err().map(|e| e.to_string())
157+
} }
158+
159+
byond_fn! { redis_disconnect() {
160+
disconnect();
161+
Some("")
162+
} }
163+
164+
byond_fn! { redis_subscribe(channel) {
165+
subscribe(channel)
166+
} }
167+
168+
byond_fn! { redis_get_messages() {
169+
Some(get_messages())
170+
} }
171+
172+
byond_fn! { redis_publish(channel, message) {
173+
publish(channel, message)
174+
} }

0 commit comments

Comments
 (0)