Skip to content

Commit c2281c4

Browse files
authored
[ISSUE #1228]🚀Implement Java MessageDecoder class encodeUniquely and encode method (#1229)
1 parent f06b4ff commit c2281c4

File tree

7 files changed

+356
-25
lines changed

7 files changed

+356
-25
lines changed

‎rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ use rocketmq_rust::ArcMut;
5555
use tokio::runtime::Handle;
5656
use tokio::sync::RwLock;
5757
use tokio::sync::Semaphore;
58-
use tokio_util::bytes::Bytes;
5958
use tracing::warn;
6059

6160
use crate::base::client_config::ClientConfig;
@@ -1246,7 +1245,7 @@ impl DefaultMQProducerImpl {
12461245
.compress(body, self.producer_config.compress_level());
12471246
if let Ok(data) = data {
12481247
//store the compressed data
1249-
msg.set_compressed_body_mut(Bytes::from(data));
1248+
msg.set_compressed_body_mut(data);
12501249
return true;
12511250
}
12521251
}

‎rocketmq-common/src/common/compression/compression_type.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -49,37 +49,37 @@ impl CompressionType {
4949
}
5050
}
5151

52-
pub fn compression(&self, data: &Bytes) -> Bytes {
52+
pub fn compression(&self, data: &[u8]) -> Bytes {
5353
match self {
5454
CompressionType::LZ4 => {
55-
let compressed = compress_prepend_size(data.chunk());
55+
let compressed = compress_prepend_size(data);
5656
Bytes::from(compressed)
5757
}
5858
CompressionType::Zstd => {
59-
let compressed = zstd::encode_all(data.clone().reader(), 5).unwrap();
59+
let compressed = zstd::encode_all(data.reader(), 5).unwrap();
6060
Bytes::from(compressed)
6161
}
6262
CompressionType::Zlib => {
6363
let mut zlib_encoder = ZlibEncoder::new(Vec::new(), Compression::default());
64-
let _ = zlib_encoder.write_all(data.chunk());
64+
let _ = zlib_encoder.write_all(data);
6565
let result = zlib_encoder.finish().unwrap();
6666
Bytes::from(result)
6767
}
6868
}
6969
}
7070

71-
pub fn decompression(&self, data: &Bytes) -> Bytes {
71+
pub fn decompression(&self, data: &[u8]) -> Bytes {
7272
match self {
7373
CompressionType::LZ4 => {
74-
let compressed = decompress_size_prepended(data.chunk()).unwrap();
74+
let compressed = decompress_size_prepended(data).unwrap();
7575
Bytes::from(compressed)
7676
}
7777
CompressionType::Zstd => {
78-
let compressed = zstd::decode_all(data.clone().reader()).unwrap();
78+
let compressed = zstd::decode_all(data.reader()).unwrap();
7979
Bytes::from(compressed)
8080
}
8181
CompressionType::Zlib => {
82-
let mut zlib_encoder = ZlibDecoder::new(data.clone().reader());
82+
let mut zlib_encoder = ZlibDecoder::new(data.reader());
8383
let mut decompressed_data = Vec::new();
8484
zlib_encoder.read_to_end(&mut decompressed_data).unwrap();
8585
Bytes::from(decompressed_data)

‎rocketmq-common/src/common/compression/compressor.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use bytes::Bytes;
18+
19+
use crate::Result;
20+
1721
pub trait Compressor {
1822
/// Compress message by different compressor.
1923
///
@@ -25,7 +29,7 @@ pub trait Compressor {
2529
/// # Returns
2630
///
2731
/// Compressed byte data or an `std::io::Error`.
28-
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, std::io::Error>;
32+
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes>;
2933

3034
/// Decompress message by different compressor.
3135
///
@@ -36,5 +40,5 @@ pub trait Compressor {
3640
/// # Returns
3741
///
3842
/// Decompressed byte data or an `std::io::Error`.
39-
fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, std::io::Error>;
43+
fn decompress(&self, src: &[u8]) -> Result<Bytes>;
4044
}

‎rocketmq-common/src/common/compression/lz4_compressor.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::io::Error;
17+
use bytes::Bytes;
1818

19+
use crate::common::compression::compression_type::CompressionType;
1920
use crate::common::compression::compressor::Compressor;
21+
use crate::Result;
2022

2123
pub struct Lz4Compressor;
2224

2325
impl Compressor for Lz4Compressor {
24-
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
25-
todo!()
26+
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
27+
Ok(CompressionType::LZ4.compression(src))
2628
}
2729

28-
fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
29-
todo!()
30+
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
31+
Ok(CompressionType::LZ4.decompression(src))
3032
}
3133
}

‎rocketmq-common/src/common/compression/zlib_compressor.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616
*/
1717
use std::io::Error;
1818

19+
use bytes::Bytes;
20+
21+
use crate::common::compression::compression_type::CompressionType;
1922
use crate::common::compression::compressor::Compressor;
23+
use crate::Result;
2024

2125
pub struct ZlibCompressor;
2226

2327
impl Compressor for ZlibCompressor {
24-
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
25-
todo!()
28+
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
29+
Ok(CompressionType::Zlib.compression(src))
2630
}
2731

28-
fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
29-
todo!()
32+
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
33+
Ok(CompressionType::Zlib.decompression(src))
3034
}
3135
}

‎rocketmq-common/src/common/compression/zstd_compressor.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616
*/
1717
use std::io::Error;
1818

19+
use bytes::Bytes;
20+
21+
use crate::common::compression::compression_type::CompressionType;
1922
use crate::common::compression::compressor::Compressor;
23+
use crate::Result;
2024

2125
pub struct ZstdCompressor;
2226

2327
impl Compressor for ZstdCompressor {
24-
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
25-
todo!()
28+
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
29+
Ok(CompressionType::Zstd.compression(src))
2630
}
2731

28-
fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
29-
todo!()
32+
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
33+
Ok(CompressionType::Zstd.decompression(src))
3034
}
3135
}

0 commit comments

Comments
 (0)