Skip to content

Commit 003ef81

Browse files
authored
feat(bindings/java): implement async ops to pass AsyncStepsTest (#2291)
* refactor(bindings/java): Drop usage of GlobalRef Signed-off-by: tison <[email protected]> * feat(bindings/java): support stat Signed-off-by: tison <[email protected]> * feat(bindings/java): support read Signed-off-by: tison <[email protected]> --------- Signed-off-by: tison <[email protected]>
1 parent be571e7 commit 003ef81

File tree

7 files changed

+220
-47
lines changed

7 files changed

+220
-47
lines changed

bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java

-4
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,8 @@ public Metadata stat(String fileName) {
4747
protected native void disposeInternal(long handle);
4848

4949
private static native long constructor(String schema, Map<String, String> map);
50-
5150
private static native void write(long nativeHandle, String fileName, String content);
52-
5351
private static native String read(long nativeHandle, String fileName);
54-
5552
private static native void delete(long nativeHandle, String fileName);
56-
5753
private static native long stat(long nativeHandle, String file);
5854
}

bindings/java/src/main/java/org/apache/opendal/Metadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.opendal;
2121

2222
public class Metadata extends NativeObject {
23-
public Metadata(long nativeHandle) {
23+
protected Metadata(long nativeHandle) {
2424
super(nativeHandle);
2525
}
2626

bindings/java/src/main/java/org/apache/opendal/Operator.java

+51-4
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,71 @@
2020
package org.apache.opendal;
2121

2222
import java.util.Map;
23+
import java.util.UUID;
2324
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
2426

2527
public class Operator extends NativeObject {
28+
private static AsyncRegistry registry() {
29+
return AsyncRegistry.INSTANCE;
30+
}
31+
32+
private enum AsyncRegistry {
33+
INSTANCE;
34+
35+
private final Map<Long, CompletableFuture<?>> registry = new ConcurrentHashMap<>();
2636

37+
@SuppressWarnings("unused") // called by jni-rs
38+
private long requestId() {
39+
final CompletableFuture<?> f = new CompletableFuture<>();
40+
while (true) {
41+
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
42+
final CompletableFuture<?> prev = registry.putIfAbsent(requestId, f);
43+
if (prev == null) {
44+
return requestId;
45+
}
46+
}
47+
}
48+
49+
private CompletableFuture<?> get(long requestId) {
50+
return registry.get(requestId);
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
private <T> CompletableFuture<T> take(long requestId) {
55+
final CompletableFuture<?> f = get(requestId);
56+
if (f != null) {
57+
f.whenComplete((r, e) -> registry.remove(requestId));
58+
}
59+
return (CompletableFuture<T>) f;
60+
}
61+
}
2762

2863
public Operator(String schema, Map<String, String> map) {
2964
super(constructor(schema, map));
3065
}
3166

32-
3367
public CompletableFuture<Void> write(String fileName, String content) {
34-
return write(nativeHandle, fileName, content);
68+
final long requestId = write(nativeHandle, fileName, content);
69+
return registry().take(requestId);
70+
}
71+
72+
public CompletableFuture<Metadata> stat(String fileName) {
73+
final long requestId = stat(nativeHandle, fileName);
74+
final CompletableFuture<Long> f = registry().take(requestId);
75+
return f.thenApply(Metadata::new);
76+
}
77+
78+
public CompletableFuture<String> read(String fileName) {
79+
final long requestId = read(nativeHandle, fileName);
80+
return registry().take(requestId);
3581
}
3682

3783
@Override
3884
protected native void disposeInternal(long handle);
3985

4086
private static native long constructor(String schema, Map<String, String> map);
41-
42-
private static native CompletableFuture<Void> write(long nativeHandle, String fileName, String content);
87+
private static native long read(long nativeHandle, String fileName);
88+
private static native long write(long nativeHandle, String fileName, String content);
89+
private static native long stat(long nativeHandle, String file);
4390
}

bindings/java/src/main/java/org/apache/opendal/exception/ODException.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public class ODException extends RuntimeException {
2323
private final Code code;
2424

25+
@SuppressWarnings("unused") // called by jni-rs
2526
public ODException(String code, String message) {
2627
this(Code.valueOf(code), message);
2728
}

bindings/java/src/operator.rs

+147-23
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
use std::str::FromStr;
1919

20-
use jni::objects::{JClass, JObject, JString, JValue};
21-
use jni::sys::{jlong, jobject};
20+
use jni::objects::{JClass, JObject, JString, JValue, JValueOwned};
21+
use jni::sys::jlong;
2222
use jni::JNIEnv;
2323

2424
use opendal::{Operator, Scheme};
2525

26-
use crate::error::Error;
2726
use crate::{get_current_env, Result};
2827
use crate::{jmap_to_hashmap, RUNTIME};
2928

@@ -69,10 +68,10 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
6968
op: *mut Operator,
7069
file: JString,
7170
content: JString,
72-
) -> jobject {
71+
) -> jlong {
7372
intern_write(&mut env, op, file, content).unwrap_or_else(|e| {
7473
e.throw(&mut env);
75-
JObject::null().into_raw()
74+
0
7675
})
7776
}
7877

@@ -81,41 +80,147 @@ fn intern_write(
8180
op: *mut Operator,
8281
file: JString,
8382
content: JString,
84-
) -> Result<jobject> {
83+
) -> Result<jlong> {
84+
let op = unsafe { &mut *op };
85+
let id = request_id(env)?;
86+
87+
let file = env.get_string(&file)?.to_str()?.to_string();
88+
let content = env.get_string(&content)?.to_str()?.to_string();
89+
90+
let runtime = unsafe { RUNTIME.get_unchecked() };
91+
runtime.spawn(async move {
92+
let result = do_write(op, file, content).await;
93+
complete_future(id, result.map(|_| JValueOwned::Void))
94+
});
95+
96+
Ok(id)
97+
}
98+
99+
async fn do_write(op: &mut Operator, file: String, content: String) -> Result<()> {
100+
Ok(op.write(&file, content).await?)
101+
}
102+
103+
/// # Safety
104+
///
105+
/// This function should not be called before the Operator are ready.
106+
#[no_mangle]
107+
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_stat(
108+
mut env: JNIEnv,
109+
_: JClass,
110+
op: *mut Operator,
111+
file: JString,
112+
) -> jlong {
113+
intern_stat(&mut env, op, file).unwrap_or_else(|e| {
114+
e.throw(&mut env);
115+
0
116+
})
117+
}
118+
119+
fn intern_stat(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
85120
let op = unsafe { &mut *op };
86-
let file: String = env.get_string(&file)?.into();
87-
let content: String = env.get_string(&content)?.into();
121+
let id = request_id(env)?;
122+
123+
let file = env.get_string(&file)?.to_str()?.to_string();
124+
125+
let runtime = unsafe { RUNTIME.get_unchecked() };
126+
runtime.spawn(async move {
127+
let result = do_stat(op, file).await;
128+
complete_future(id, result.map(JValueOwned::Long))
129+
});
88130

89-
let class = "java/util/concurrent/CompletableFuture";
90-
let f = env.new_object(class, "()V", &[])?;
131+
Ok(id)
132+
}
133+
134+
async fn do_stat(op: &mut Operator, file: String) -> Result<jlong> {
135+
let metadata = op.stat(&file).await?;
136+
Ok(Box::into_raw(Box::new(metadata)) as jlong)
137+
}
138+
139+
/// # Safety
140+
///
141+
/// This function should not be called before the Operator are ready.
142+
#[no_mangle]
143+
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_read(
144+
mut env: JNIEnv,
145+
_: JClass,
146+
op: *mut Operator,
147+
file: JString,
148+
) -> jlong {
149+
intern_read(&mut env, op, file).unwrap_or_else(|e| {
150+
e.throw(&mut env);
151+
0
152+
})
153+
}
154+
155+
fn intern_read(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
156+
let op = unsafe { &mut *op };
157+
let id = request_id(env)?;
91158

92-
// keep the future alive, so that we can complete it later
93-
// but this approach will be limited by global ref table size (65535)
94-
let future = env.new_global_ref(&f)?;
159+
let file = env.get_string(&file)?.to_str()?.to_string();
95160

96161
let runtime = unsafe { RUNTIME.get_unchecked() };
97162
runtime.spawn(async move {
98-
let result = match op.write(&file, content).await {
99-
Ok(()) => Ok(JObject::null()),
100-
Err(err) => Err(Error::from(err)),
101-
};
102-
complete_future(future.as_ref(), result)
163+
let result = do_read(op, file).await;
164+
complete_future(id, result.map(JValueOwned::Object))
103165
});
104166

105-
Ok(f.into_raw())
167+
Ok(id)
168+
}
169+
170+
async fn do_read<'local>(op: &mut Operator, file: String) -> Result<JObject<'local>> {
171+
let content = op.read(&file).await?;
172+
let content = String::from_utf8(content)?;
173+
174+
let env = unsafe { get_current_env() };
175+
let result = env.new_string(content)?;
176+
Ok(result.into())
177+
}
178+
179+
fn request_id(env: &mut JNIEnv) -> Result<jlong> {
180+
let registry = env
181+
.call_static_method(
182+
"org/apache/opendal/Operator",
183+
"registry",
184+
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
185+
&[],
186+
)?
187+
.l()?;
188+
Ok(env.call_method(registry, "requestId", "()J", &[])?.j()?)
106189
}
107190

108-
fn complete_future(future: &JObject, result: Result<JObject>) {
191+
fn make_object<'local>(
192+
env: &mut JNIEnv<'local>,
193+
value: JValueOwned<'local>,
194+
) -> Result<JObject<'local>> {
195+
let o = match value {
196+
JValueOwned::Object(o) => o,
197+
JValueOwned::Byte(_) => env.new_object("java/lang/Long", "(B)V", &[value.borrow()])?,
198+
JValueOwned::Char(_) => env.new_object("java/lang/Char", "(C)V", &[value.borrow()])?,
199+
JValueOwned::Short(_) => env.new_object("java/lang/Short", "(S)V", &[value.borrow()])?,
200+
JValueOwned::Int(_) => env.new_object("java/lang/Integer", "(I)V", &[value.borrow()])?,
201+
JValueOwned::Long(_) => env.new_object("java/lang/Long", "(J)V", &[value.borrow()])?,
202+
JValueOwned::Bool(_) => env.new_object("java/lang/Boolean", "(Z)V", &[value.borrow()])?,
203+
JValueOwned::Float(_) => env.new_object("java/lang/Float", "(F)V", &[value.borrow()])?,
204+
JValueOwned::Double(_) => env.new_object("java/lang/Double", "(D)V", &[value.borrow()])?,
205+
JValueOwned::Void => JObject::null(),
206+
};
207+
Ok(o)
208+
}
209+
210+
fn complete_future(id: jlong, result: Result<JValueOwned>) {
109211
let mut env = unsafe { get_current_env() };
212+
let future = get_future(&mut env, id).unwrap();
110213
match result {
111-
Ok(result) => env
112-
.call_method(
214+
Ok(result) => {
215+
let result = make_object(&mut env, result).unwrap();
216+
env.call_method(
113217
future,
114218
"complete",
115219
"(Ljava/lang/Object;)Z",
116220
&[JValue::Object(&result)],
117221
)
118-
.unwrap(),
222+
.unwrap()
223+
}
119224
Err(err) => {
120225
let exception = err.to_exception(&mut env).unwrap();
121226
env.call_method(
@@ -128,3 +233,22 @@ fn complete_future(future: &JObject, result: Result<JObject>) {
128233
}
129234
};
130235
}
236+
237+
fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result<JObject<'local>> {
238+
let registry = env
239+
.call_static_method(
240+
"org/apache/opendal/Operator",
241+
"registry",
242+
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
243+
&[],
244+
)?
245+
.l()?;
246+
Ok(env
247+
.call_method(
248+
registry,
249+
"get",
250+
"(J)Ljava/util/concurrent/CompletableFuture;",
251+
&[JValue::Long(id)],
252+
)?
253+
.l()?)
254+
}

bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -26,42 +26,47 @@
2626
import java.util.Map;
2727
import java.util.concurrent.CompletableFuture;
2828

29+
import static org.junit.jupiter.api.Assertions.assertEquals;
2930
import static org.junit.jupiter.api.Assertions.assertFalse;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3032
import static org.junit.jupiter.api.Assertions.assertTrue;
3133

3234
public class AsyncStepsTest {
33-
Operator operator;
35+
Operator op;
3436

3537
@Given("A new OpenDAL Async Operator")
3638
public void a_new_open_dal_async_operator() {
3739
Map<String, String> params = new HashMap<>();
3840
params.put("root", "/tmp");
39-
operator = new Operator("Memory", params);
41+
op = new Operator("Memory", params);
4042
}
4143

4244
@When("Async write path {string} with content {string}")
4345
public void async_write_path_test_with_content_hello_world(String fileName, String content) {
44-
CompletableFuture<Void> f = operator.write(fileName, content);
45-
46-
f.join();
47-
48-
assertTrue(f.isDone());
49-
assertFalse(f.isCompletedExceptionally());
46+
op.write(fileName, content).join();
5047
}
5148

5249
@Then("The async file {string} should exist")
5350
public void the_async_file_test_should_exist(String fileName) {
51+
Metadata metadata = op.stat(fileName).join();
52+
assertNotNull(metadata);
5453
}
5554

5655
@Then("The async file {string} entry mode must be file")
5756
public void the_async_file_test_entry_mode_must_be_file(String fileName) {
57+
Metadata metadata = op.stat(fileName).join();
58+
assertTrue(metadata.isFile());
5859
}
5960

6061
@Then("The async file {string} content length must be {int}")
6162
public void the_async_file_test_content_length_must_be_13(String fileName, int length) {
63+
Metadata metadata = op.stat(fileName).join();
64+
assertEquals(metadata.getContentLength(), length);
6265
}
6366

6467
@Then("The async file {string} must have content {string}")
6568
public void the_async_file_test_must_have_content_hello_world(String fileName, String content) {
69+
String readContent = op.read(fileName).join();
70+
assertEquals(content, readContent);
6671
}
6772
}

0 commit comments

Comments
 (0)