Skip to content

Commit 8de6827

Browse files
authored
Merge pull request #86 from romand/i32be
Add i32be stream type
2 parents d188a18 + 7dc4041 commit 8de6827

File tree

10 files changed

+75
-1
lines changed

10 files changed

+75
-1
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
### protobuf to json deserializer, written in Rust
44

5-
`pq` is a tool which deserializes protobuf messages given a set of pre-compiled `.fdset` files. It can understand varint/leb128-delimited streams, and it can connect to Kafka.
5+
`pq` is a tool which deserializes protobuf messages given a set of pre-compiled `.fdset` files. It can understand varint/leb128-delimited/i32be streams, and it can connect to Kafka.
66

77
`pq` will pretty-print when outputting to a tty, but you should pipe it to `jq` for more fully-featured json handling.
88

USAGE.md

+11
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ $ pq kafka my_topic --brokers=192.168.0.1:9092 --count 1 --convert varint |\
6161
}
6262
```
6363

64+
Pipe `kafkacat` output to it:
65+
```
66+
$ kafkacat -b 192.168.0.1:9092 -C -u -q -f "%R%s" -t my_topic |\
67+
> pq --msgtype=com.example.dog.Dog --stream i32be
68+
{
69+
"age": 10,
70+
"breed": "gsd",
71+
"temperament": "aggressive"
72+
}
73+
```
74+
6475
### Compile without kafka
6576

6677
To compile `pq` without kafka support, run:

stream-delimit/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ edition = "2018"
1212

1313
[dependencies]
1414
kafka = { version = "0.7", optional = true }
15+
byteorder = "1.3"
1516

1617
[features]
1718
default = []

stream-delimit/src/byte_consumer.rs

100644100755
+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![deny(missing_docs)]
22

3+
use crate::i32be::consume_single_i32be;
34
use crate::stream::*;
45
use crate::varint::consume_single_varint;
56
use std::io::Read;
@@ -23,6 +24,7 @@ impl<T: Read> Iterator for ByteConsumer<T> {
2324
fn next(&mut self) -> Option<Vec<u8>> {
2425
match self.type_ {
2526
StreamType::Leb128 | StreamType::Varint => consume_single_varint(&mut self.read),
27+
StreamType::I32BE => consume_single_i32be(&mut self.read),
2628
StreamType::Single => {
2729
let ret: Option<Vec<u8>>;
2830
let mut buf = Vec::new();

stream-delimit/src/i32be.rs

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#![deny(missing_docs)]
2+
3+
use byteorder::{BigEndian, ReadBytesExt};
4+
use std::io::Read;
5+
6+
pub fn consume_single_i32be(read: &mut Read) -> Option<Vec<u8>> {
7+
match read.read_i32::<BigEndian>() {
8+
Ok(length) => {
9+
let mut msg_buf = vec![0; length as usize];
10+
match read.read_exact(&mut msg_buf) {
11+
Ok(_) => Some(msg_buf),
12+
Err(_) => None,
13+
}
14+
}
15+
Err(_) => None,
16+
}
17+
}
18+
19+
#[cfg(test)]
20+
mod tests {
21+
use super::*;
22+
use byteorder::WriteBytesExt;
23+
use std::io::Cursor;
24+
25+
fn reads_back(x: i32) {
26+
let mut buf = vec![];
27+
buf.write_i32::<BigEndian>(x).unwrap();
28+
assert_eq!(x, Cursor::new(buf).read_i32::<BigEndian>().unwrap());
29+
}
30+
31+
#[test]
32+
fn test_simple() {
33+
reads_back(1);
34+
}
35+
36+
#[test]
37+
fn test_delimiter_longer() {
38+
reads_back(300);
39+
}
40+
}

stream-delimit/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
extern crate byteorder;
12
#[cfg(feature = "with_kafka")]
23
extern crate kafka;
34

5+
mod i32be;
46
mod varint;
57

68
pub mod error;

stream-delimit/src/stream.rs

100644100755
+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ pub enum StreamType {
88
Leb128,
99
/// Protobuf messages with leading length encoded in varint
1010
Varint,
11+
/// Protobuf messages with leading length encoded as
12+
/// binary big endian 32-bit signed integer
13+
I32BE,
1114
/// Single protobuf messages with no separators/delimiters
1215
Single,
1316
}
@@ -18,6 +21,7 @@ pub fn str_to_streamtype(input: &str) -> Result<StreamType> {
1821
"single" => Ok(StreamType::Single),
1922
"varint" => Ok(StreamType::Varint),
2023
"leb128" => Ok(StreamType::Leb128),
24+
"i32be" => Ok(StreamType::I32BE),
2125
_ => Err(StreamDelimitError::InvalidStreamTypeError(
2226
input.to_string(),
2327
))?,

tests/samples/dog_i32be_stream

20 Bytes
Binary file not shown.

tests/test_pqrs_bin.rs

+13
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ fn test_dog_decode_stream() {
3535
.unwrap();
3636
}
3737

38+
#[test]
39+
fn test_dog_decode_i32be_stream() {
40+
assert_cli::Assert::main_binary()
41+
.with_env(assert_cli::Environment::inherit().insert("FDSET_PATH", get_fdset_dir("fdsets")))
42+
.with_args(&["--msgtype=com.example.dog.Dog", "--stream=i32be"])
43+
.stdin(include_str!("samples/dog_i32be_stream"))
44+
.succeeds()
45+
.and()
46+
.stdout()
47+
.contains("{\"age\":3,\"breed\":\"gsd\",\"temperament\":\"excited\"}")
48+
.unwrap();
49+
}
50+
3851
#[test]
3952
fn test_nonexistent_fdset_dir() {
4053
assert_cli::Assert::main_binary()

0 commit comments

Comments
 (0)