-
Notifications
You must be signed in to change notification settings - Fork 106
Adds Redis Pub/Sub integration #80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
279c90d
add redis pubsub library
MCHSL d5d67ff
fix typo
MCHSL d628fae
bound channels to 1000 messages, bump version to 0.6.0
MCHSL bdaaa97
update readme
MCHSL a360fcd
discard overflowing messages
MCHSL b9ab17d
Merge branch 'master' of https://github.com/tgstation/rust-g into redis
MCHSL 17c6251
increase robustness
MCHSL 1fc0a22
actually attempt connecting in the connect() method
MCHSL be5d478
alphabetize, add questionable error handling
MCHSL File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
#define RUSTG_REDIS_ERROR_CHANNEL "RUSTG_REDIS_ERROR_CHANNEL" | ||
|
||
#define rustg_redis_connect(addr) call(RUST_G, "redis_connect")(addr) | ||
/proc/rustg_redis_disconnect() return call(RUST_G, "redis_disconnect")() | ||
#define rustg_redis_subscribe(channel) call(RUST_G, "redis_subscribe")(channel) | ||
/proc/rustg_redis_get_messages() return call(RUST_G, "redis_get_messages")() | ||
#define rustg_redis_publish(channel, message) call(RUST_G, "redis_publish")(channel, message) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
use redis::{Client, Commands, RedisError}; | ||
use std::cell::RefCell; | ||
use std::collections::HashMap; | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
static ERROR_CHANNEL: &'static str = "RUSTG_REDIS_ERROR_CHANNEL"; | ||
|
||
thread_local! { | ||
static REQUEST_SENDER: RefCell<Option<flume::Sender<PubSubRequest>>> = RefCell::new(None); | ||
static RESPONSE_RECEIVER: RefCell<Option<flume::Receiver<PubSubResponse>>> = RefCell::new(None); | ||
} | ||
|
||
enum PubSubRequest { | ||
Quit, | ||
Subscribe(String), | ||
Publish(String, String), | ||
} | ||
|
||
// response might not be a good name, since those are not sent in response to requests | ||
enum PubSubResponse { | ||
Disconnected(String), | ||
Message(String, String), | ||
} | ||
|
||
fn handle_redis_inner( | ||
client: Client, | ||
control: flume::Receiver<PubSubRequest>, | ||
out: flume::Sender<PubSubResponse>, | ||
) -> Result<(), RedisError> { | ||
let mut conn = client.get_connection()?; | ||
let mut pub_conn = client.get_connection()?; | ||
let mut pubsub = conn.as_pubsub(); | ||
let _ = pubsub.set_read_timeout(Some(Duration::from_secs(1))); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
'outer: loop { | ||
for req in control.try_iter() { | ||
match req { | ||
PubSubRequest::Quit => break 'outer, | ||
PubSubRequest::Subscribe(chan) => { | ||
pubsub.subscribe(&chan)?; | ||
} | ||
PubSubRequest::Publish(chan, msg) => { | ||
// kinda lame how PubSub doesn't have the Pub | ||
pub_conn.publish(&chan, &msg)? | ||
} | ||
} | ||
} | ||
|
||
if let Some(msg) = match pubsub.get_message() { | ||
Ok(msg) => Some(msg), | ||
Err(e) => { | ||
if e.is_timeout() { | ||
None | ||
} else { | ||
return Err(e); | ||
} | ||
} | ||
} { | ||
let chan = msg.get_channel_name().to_owned(); | ||
let data: String = msg.get_payload().unwrap_or_default(); | ||
let _ = out.send(PubSubResponse::Message(chan, data)); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn handle_redis( | ||
client: Client, | ||
control: flume::Receiver<PubSubRequest>, | ||
out: flume::Sender<PubSubResponse>, | ||
) { | ||
let out_copy = out.clone(); | ||
if let Err(e) = handle_redis_inner(client, control, out) { | ||
let _ = out_copy.send(PubSubResponse::Disconnected(e.to_string())); | ||
} | ||
} | ||
|
||
fn connect(addr: &str) -> Result<(), RedisError> { | ||
let client = redis::Client::open(addr)?; | ||
let (c_sender, c_receiver) = flume::unbounded(); | ||
let (o_sender, o_receiver) = flume::unbounded(); | ||
REQUEST_SENDER.with(|cell| cell.replace(Some(c_sender))); | ||
RESPONSE_RECEIVER.with(|cell| cell.replace(Some(o_receiver))); | ||
thread::spawn(|| handle_redis(client, c_receiver, o_sender)); | ||
Ok(()) | ||
} | ||
|
||
fn disconnect() { | ||
REQUEST_SENDER.with(|cell| { | ||
if let Some(chan) = cell.borrow_mut().as_ref() { | ||
let _ = chan.send(PubSubRequest::Quit); | ||
} | ||
cell.replace(None); | ||
}); | ||
RESPONSE_RECEIVER.with(|cell| { | ||
cell.replace(None); | ||
}); | ||
} | ||
|
||
fn subscribe(channel: &str) { | ||
REQUEST_SENDER.with(|cell| { | ||
if let Some(chan) = cell.borrow_mut().as_ref() { | ||
let _ = chan.send(PubSubRequest::Subscribe(channel.to_owned())); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
}); | ||
} | ||
|
||
fn publish(channel: &str, msg: &str) { | ||
REQUEST_SENDER.with(|cell| { | ||
if let Some(chan) = cell.borrow_mut().as_ref() { | ||
let _ = chan.send(PubSubRequest::Publish(channel.to_owned(), msg.to_owned())); | ||
MCHSL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
}); | ||
} | ||
|
||
fn get_messages() -> String { | ||
let mut result: HashMap<String, Vec<String>> = HashMap::new(); | ||
RESPONSE_RECEIVER.with(|cell| { | ||
let opt = cell.borrow_mut(); | ||
if let Some(recv) = opt.as_ref() { | ||
for resp in recv.try_iter() { | ||
match resp { | ||
PubSubResponse::Message(chan, msg) => { | ||
result.entry(chan).or_default().push(msg); | ||
} | ||
PubSubResponse::Disconnected(error) => { | ||
// Pardon the in-band signaling but it's probably the best way to do this | ||
result | ||
.entry(ERROR_CHANNEL.to_owned()) | ||
.or_default() | ||
.push(error); | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
|
||
serde_json::to_string(&result).unwrap_or("{}".to_owned()) | ||
} | ||
|
||
byond_fn! { redis_connect(addr) { | ||
connect(addr).err().map(|e| e.to_string()) | ||
} } | ||
|
||
byond_fn! { redis_disconnect() { | ||
disconnect(); | ||
Some("") | ||
} } | ||
|
||
byond_fn! { redis_subscribe(channel) { | ||
subscribe(channel); | ||
Some("") | ||
} } | ||
|
||
byond_fn! { redis_get_messages() { | ||
Some(get_messages()) | ||
} } | ||
|
||
byond_fn! { redis_publish(channel, message) { | ||
publish(channel, message); | ||
Some("") | ||
} } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you might want to bump the version of the package itself
version = "0.5.0"
I guess with semver this would be
version = "0.6.0"
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would you include the patch version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that was standard with semver. RUSTG has always been 0.x.0