Skip to content

Commit 7a2a1f3

Browse files
authored
Adding missing QoS param to Get operations (#198)
1 parent 88db7d4 commit 7a2a1f3

File tree

3 files changed

+26
-4
lines changed

3 files changed

+26
-4
lines changed

zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,10 @@ internal class JNISession {
286286
options.attachment?.into()?.bytes,
287287
options.payload?.into()?.bytes,
288288
options.encoding?.id ?: Encoding.defaultEncoding().id,
289-
options.encoding?.schema
289+
options.encoding?.schema,
290+
options.qos.congestionControl.value,
291+
options.qos.priority.value,
292+
options.qos.express
290293
)
291294
}
292295

@@ -348,12 +351,14 @@ internal class JNISession {
348351
options.attachment?.into()?.bytes,
349352
options.payload?.into()?.bytes,
350353
options.encoding?.id ?: Encoding.defaultEncoding().id,
351-
options.encoding?.schema
354+
options.encoding?.schema,
355+
options.qos.congestionControl.value,
356+
options.qos.priority.value,
357+
options.qos.express
352358
)
353359
return handler.receiver()
354360
}
355361

356-
357362
@Throws(ZError::class)
358363
fun declareKeyExpr(keyExpr: String): KeyExpr {
359364
val ptr = declareKeyExprViaJNI(sessionPtr.get(), keyExpr)
@@ -501,6 +506,9 @@ internal class JNISession {
501506
payload: ByteArray?,
502507
encodingId: Int,
503508
encodingSchema: String?,
509+
congestionControl: Int,
510+
priority: Int,
511+
express: Boolean,
504512
)
505513

506514
@Throws(ZError::class)

zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package io.zenoh.query
1717
import io.zenoh.bytes.Encoding
1818
import io.zenoh.bytes.IntoZBytes
1919
import io.zenoh.bytes.ZBytes
20+
import io.zenoh.qos.QoS
2021
import java.time.Duration
2122

2223
/**
@@ -28,14 +29,16 @@ import java.time.Duration
2829
* @param payload Optional payload.
2930
* @param encoding Encoding of the payload.
3031
* @param attachment Optional attachment.
32+
* @param qos The intended [QoS] for the query.
3133
*/
3234
data class GetOptions(
3335
var timeout: Duration = Duration.ofMillis(10000),
3436
var target: QueryTarget = QueryTarget.BEST_MATCHING,
3537
var consolidation: ConsolidationMode = ConsolidationMode.AUTO,
3638
var payload: IntoZBytes? = null,
3739
var encoding: Encoding? = null,
38-
var attachment: IntoZBytes? = null
40+
var attachment: IntoZBytes? = null,
41+
var qos: QoS = QoS.defaultQoS
3942
) {
4043
fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) }
4144
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }

zenoh-jni/src/session.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI(
854854
/// - `payload`: Optional payload for the query.
855855
/// - `encoding_id`: The encoding of the payload.
856856
/// - `encoding_schema`: The encoding schema of the payload, may be null.
857+
/// - `congestion_control`: The ordinal value of the congestion control enum value.
858+
/// - `priority`: The ordinal value of the priority enum value.
859+
/// - `is_express`: The boolean express value of the QoS provided.
857860
///
858861
/// Safety:
859862
/// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction.
@@ -883,6 +886,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
883886
payload: /*nullable*/ JByteArray,
884887
encoding_id: jint,
885888
encoding_schema: /*nullable*/ JString,
889+
congestion_control: jint,
890+
priority: jint,
891+
is_express: jboolean,
886892
) {
887893
let session = Arc::from_raw(session_ptr);
888894
let _ = || -> ZResult<()> {
@@ -893,6 +899,8 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
893899
let query_target = decode_query_target(target)?;
894900
let consolidation = decode_consolidation(consolidation)?;
895901
let timeout = Duration::from_millis(timeout_ms as u64);
902+
let congestion_control = decode_congestion_control(congestion_control)?;
903+
let priority = decode_priority(priority)?;
896904
let on_close = load_on_close(&java_vm, on_close_global_ref);
897905
let selector_params = if selector_params.is_null() {
898906
String::new()
@@ -902,6 +910,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
902910
let selector = Selector::owned(&key_expr, selector_params);
903911
let mut get_builder = session
904912
.get(selector)
913+
.congestion_control(congestion_control)
914+
.priority(priority)
915+
.express(is_express != 0)
905916
.callback(move |reply| {
906917
|| -> ZResult<()> {
907918
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure

0 commit comments

Comments
 (0)