-
Notifications
You must be signed in to change notification settings - Fork 106
Adds Redis Pub/Sub integration #80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
279c90d
d5d67ff
d628fae
bdaaa97
a360fcd
b9ab17d
17c6251
1fc0a22
be5d478
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
[package] | ||
name = "rust-g" | ||
edition = "2018" | ||
version = "0.5.0" | ||
version = "0.6.0" | ||
authors = ["Bjorn Neergaard <[email protected]>"] | ||
repository = "https://github.com/tgstation/rust-g" | ||
license-file = "LICENSE" | ||
|
@@ -41,8 +41,9 @@ mysql = { version = "20.0", optional = true } | |
dashmap = { version = "4.0", optional = true } | ||
zip = { version = "0.5.8", optional = true } | ||
rand = {version = "0.8", optional = true} | ||
dmsort = {version = "1.0.0", optional = true} | ||
dmsort = {version = "1.0.0", optional = true } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also you might want to bump the version of the package itself I guess with semver this would be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why would you include the patch version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I assumed that was standard with semver. RUSTG has always been 0.x.0 |
||
toml-dep = { version = "0.5.8", package="toml", optional = true } | ||
redis = { version = "0.21.4", optional = true } | ||
|
||
[features] | ||
default = ["cellularnoise", "dmi", "file", "git", "http", "json", "log", "noise", "sql", "time", "toml", "url"] | ||
|
@@ -64,6 +65,7 @@ url = ["url-dep", "percent-encoding"] | |
hash = ["base64", "const-random", "md-5", "hex", "sha-1", "sha2", "twox-hash", "serde", "serde_json"] | ||
unzip = ["zip", "jobs"] | ||
worleynoise = ["rand","dmsort"] | ||
redis_pubsub = ["flume", "redis", "serde", "serde_json"] | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# internal feature-like things | ||
jobs = ["flume"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
#define RUSTG_REDIS_ERROR_CHANNEL "RUSTG_REDIS_ERROR_CHANNEL" | ||
|
||
#define rustg_redis_connect(addr) call(RUST_G, "redis_connect")(addr) | ||
/proc/rustg_redis_disconnect() return call(RUST_G, "redis_disconnect")() | ||
#define rustg_redis_subscribe(channel) call(RUST_G, "redis_subscribe")(channel) | ||
/proc/rustg_redis_get_messages() return call(RUST_G, "redis_get_messages")() | ||
#define rustg_redis_publish(channel, message) call(RUST_G, "redis_publish")(channel, message) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
use redis::{Client, Commands, RedisError}; | ||
use std::cell::RefCell; | ||
use std::collections::HashMap; | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
static ERROR_CHANNEL: &'static str = "RUSTG_REDIS_ERROR_CHANNEL"; | ||
|
||
thread_local! { | ||
static REQUEST_SENDER: RefCell<Option<flume::Sender<PubSubRequest>>> = RefCell::new(None); | ||
static RESPONSE_RECEIVER: RefCell<Option<flume::Receiver<PubSubResponse>>> = RefCell::new(None); | ||
} | ||
|
||
enum PubSubRequest { | ||
Subscribe(String), | ||
Publish(String, String), | ||
} | ||
|
||
// response might not be a good name, since those are not sent in response to requests | ||
enum PubSubResponse { | ||
Disconnected(String), | ||
Message(String, String), | ||
} | ||
|
||
fn handle_redis_inner( | ||
client: Client, | ||
control: &flume::Receiver<PubSubRequest>, | ||
out: &flume::Sender<PubSubResponse>, | ||
) -> Result<(), RedisError> { | ||
let mut conn = client.get_connection()?; | ||
let mut pub_conn = client.get_connection()?; | ||
let mut pubsub = conn.as_pubsub(); | ||
let _ = pubsub.set_read_timeout(Some(Duration::from_secs(1))); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
loop { | ||
loop { | ||
match control.try_recv() { | ||
Ok(req) => match req { | ||
PubSubRequest::Subscribe(channel) => { | ||
pubsub.subscribe(&[channel.as_str()])?; | ||
} | ||
PubSubRequest::Publish(channel, message) => { | ||
pub_conn.publish(&channel, &message)?; | ||
} | ||
}, | ||
Err(flume::TryRecvError::Empty) => break, | ||
Err(flume::TryRecvError::Disconnected) => return Ok(()), | ||
} | ||
} | ||
|
||
if let Some(msg) = match pubsub.get_message() { | ||
Ok(msg) => Some(msg), | ||
Err(e) => { | ||
if e.is_timeout() { | ||
None | ||
} else { | ||
return Err(e); | ||
} | ||
} | ||
} { | ||
let chan = msg.get_channel_name().to_owned(); | ||
let data: String = msg.get_payload().unwrap_or_default(); | ||
if let Err(flume::TrySendError::Disconnected(_)) = | ||
out.try_send(PubSubResponse::Message(chan, data)) | ||
{ | ||
return Ok(()); // If no one wants to receive any more messages from us, we exit this thread | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn handle_redis( | ||
client: Client, | ||
control: flume::Receiver<PubSubRequest>, | ||
out: flume::Sender<PubSubResponse>, | ||
) { | ||
if let Err(e) = handle_redis_inner(client, &control, &out) { | ||
let _ = out.try_send(PubSubResponse::Disconnected(e.to_string())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we silencing potential errors here? This is spooky. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could fail if the receiving end was dropped, in which case there's no need (or way) to communicate back the error, or if the channel was full, in which case the DM side was probably FUBAR. I suppose it doesn't hurt to make it block until there's space or if the receiver is dropped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this blocks, how is the queue ever going to clear if DM cant execute anything to clear the queue due to being blocked by this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This runs on the redis thread side, it won't block DM |
||
} | ||
} | ||
|
||
fn connect(addr: &str) -> Result<(), RedisError> { | ||
let client = redis::Client::open(addr)?; | ||
let _ = client.get_connection_with_timeout(Duration::from_secs(1))?; | ||
let (c_sender, c_receiver) = flume::bounded(1000); | ||
let (o_sender, o_receiver) = flume::bounded(1000); | ||
REQUEST_SENDER.with(|cell| cell.replace(Some(c_sender))); | ||
RESPONSE_RECEIVER.with(|cell| cell.replace(Some(o_receiver))); | ||
thread::spawn(|| handle_redis(client, c_receiver, o_sender)); | ||
Ok(()) | ||
} | ||
|
||
fn disconnect() { | ||
// Dropping the sender and receiver will cause the other thread to exit | ||
REQUEST_SENDER.with(|cell| { | ||
cell.replace(None); | ||
}); | ||
RESPONSE_RECEIVER.with(|cell| { | ||
cell.replace(None); | ||
}); | ||
} | ||
|
||
fn subscribe(channel: &str) { | ||
REQUEST_SENDER.with(|cell| { | ||
if let Some(chan) = cell.borrow_mut().as_ref() { | ||
let _ = chan.send(PubSubRequest::Subscribe(channel.to_owned())); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
}); | ||
} | ||
|
||
fn publish(channel: &str, msg: &str) { | ||
REQUEST_SENDER.with(|cell| { | ||
if let Some(chan) = cell.borrow_mut().as_ref() { | ||
let _ = chan.send(PubSubRequest::Publish(channel.to_owned(), msg.to_owned())); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
}); | ||
} | ||
|
||
fn get_messages() -> String { | ||
let mut result: HashMap<String, Vec<String>> = HashMap::new(); | ||
RESPONSE_RECEIVER.with(|cell| { | ||
let opt = cell.borrow_mut(); | ||
if let Some(recv) = opt.as_ref() { | ||
for resp in recv.try_iter() { | ||
match resp { | ||
PubSubResponse::Message(chan, msg) => { | ||
result.entry(chan).or_default().push(msg); | ||
} | ||
PubSubResponse::Disconnected(error) => { | ||
// Pardon the in-band signaling but it's probably the best way to do this | ||
result | ||
.entry(ERROR_CHANNEL.to_owned()) | ||
.or_default() | ||
.push(error); | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
|
||
serde_json::to_string(&result).unwrap_or("{}".to_owned()) | ||
} | ||
|
||
byond_fn! { redis_connect(addr) { | ||
connect(addr).err().map(|e| e.to_string()) | ||
} } | ||
|
||
byond_fn! { redis_disconnect() { | ||
disconnect(); | ||
Some("") | ||
} } | ||
|
||
byond_fn! { redis_subscribe(channel) { | ||
subscribe(channel); | ||
Some("") | ||
} } | ||
|
||
byond_fn! { redis_get_messages() { | ||
Some(get_messages()) | ||
} } | ||
|
||
byond_fn! { redis_publish(channel, message) { | ||
publish(channel, message); | ||
Some("") | ||
} } |
Uh oh!
There was an error while loading. Please reload this page.