diff --git a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java index fbdf25752f32..cb670ae0b9f3 100644 --- a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java +++ b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java @@ -47,12 +47,8 @@ public Metadata stat(String fileName) { protected native void disposeInternal(long handle); private static native long constructor(String schema, Map map); - private static native void write(long nativeHandle, String fileName, String content); - private static native String read(long nativeHandle, String fileName); - private static native void delete(long nativeHandle, String fileName); - private static native long stat(long nativeHandle, String file); } diff --git a/bindings/java/src/main/java/org/apache/opendal/Metadata.java b/bindings/java/src/main/java/org/apache/opendal/Metadata.java index 42465664068e..0a815a290012 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Metadata.java +++ b/bindings/java/src/main/java/org/apache/opendal/Metadata.java @@ -20,7 +20,7 @@ package org.apache.opendal; public class Metadata extends NativeObject { - public Metadata(long nativeHandle) { + protected Metadata(long nativeHandle) { super(nativeHandle); } diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java b/bindings/java/src/main/java/org/apache/opendal/Operator.java index 86b8066dc11d..1a615e5615a2 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Operator.java +++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java @@ -20,24 +20,71 @@ package org.apache.opendal; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class Operator extends NativeObject { + private static AsyncRegistry registry() { + return AsyncRegistry.INSTANCE; + } + + private enum AsyncRegistry { + INSTANCE; + + private final Map> registry = new ConcurrentHashMap<>(); + @SuppressWarnings("unused") // called by jni-rs + private long requestId() { + final CompletableFuture f = new CompletableFuture<>(); + while (true) { + final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + final CompletableFuture prev = registry.putIfAbsent(requestId, f); + if (prev == null) { + return requestId; + } + } + } + + private CompletableFuture get(long requestId) { + return registry.get(requestId); + } + + @SuppressWarnings("unchecked") + private CompletableFuture take(long requestId) { + final CompletableFuture f = get(requestId); + if (f != null) { + f.whenComplete((r, e) -> registry.remove(requestId)); + } + return (CompletableFuture) f; + } + } public Operator(String schema, Map map) { super(constructor(schema, map)); } - public CompletableFuture write(String fileName, String content) { - return write(nativeHandle, fileName, content); + final long requestId = write(nativeHandle, fileName, content); + return registry().take(requestId); + } + + public CompletableFuture stat(String fileName) { + final long requestId = stat(nativeHandle, fileName); + final CompletableFuture f = registry().take(requestId); + return f.thenApply(Metadata::new); + } + + public CompletableFuture read(String fileName) { + final long requestId = read(nativeHandle, fileName); + return registry().take(requestId); } @Override protected native void disposeInternal(long handle); private static native long constructor(String schema, Map map); - - private static native CompletableFuture write(long nativeHandle, String fileName, String content); + private static native long read(long nativeHandle, String fileName); + private static native long write(long nativeHandle, String fileName, String content); + private static native long stat(long nativeHandle, String file); } diff --git a/bindings/java/src/main/java/org/apache/opendal/exception/ODException.java b/bindings/java/src/main/java/org/apache/opendal/exception/ODException.java index 5a428673e65b..0c8b841908e6 100644 --- a/bindings/java/src/main/java/org/apache/opendal/exception/ODException.java +++ b/bindings/java/src/main/java/org/apache/opendal/exception/ODException.java @@ -22,6 +22,7 @@ public class ODException extends RuntimeException { private final Code code; + @SuppressWarnings("unused") // called by jni-rs public ODException(String code, String message) { this(Code.valueOf(code), message); } diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 771d4e234f59..27a35ae101b9 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -17,13 +17,12 @@ use std::str::FromStr; -use jni::objects::{JClass, JObject, JString, JValue}; -use jni::sys::{jlong, jobject}; +use jni::objects::{JClass, JObject, JString, JValue, JValueOwned}; +use jni::sys::jlong; use jni::JNIEnv; use opendal::{Operator, Scheme}; -use crate::error::Error; use crate::{get_current_env, Result}; use crate::{jmap_to_hashmap, RUNTIME}; @@ -69,10 +68,10 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write( op: *mut Operator, file: JString, content: JString, -) -> jobject { +) -> jlong { intern_write(&mut env, op, file, content).unwrap_or_else(|e| { e.throw(&mut env); - JObject::null().into_raw() + 0 }) } @@ -81,41 +80,147 @@ fn intern_write( op: *mut Operator, file: JString, content: JString, -) -> Result { +) -> Result { + let op = unsafe { &mut *op }; + let id = request_id(env)?; + + let file = env.get_string(&file)?.to_str()?.to_string(); + let content = env.get_string(&content)?.to_str()?.to_string(); + + let runtime = unsafe { RUNTIME.get_unchecked() }; + runtime.spawn(async move { + let result = do_write(op, file, content).await; + complete_future(id, result.map(|_| JValueOwned::Void)) + }); + + Ok(id) +} + +async fn do_write(op: &mut Operator, file: String, content: String) -> Result<()> { + Ok(op.write(&file, content).await?) +} + +/// # Safety +/// +/// This function should not be called before the Operator are ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_Operator_stat( + mut env: JNIEnv, + _: JClass, + op: *mut Operator, + file: JString, +) -> jlong { + intern_stat(&mut env, op, file).unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +fn intern_stat(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result { let op = unsafe { &mut *op }; - let file: String = env.get_string(&file)?.into(); - let content: String = env.get_string(&content)?.into(); + let id = request_id(env)?; + + let file = env.get_string(&file)?.to_str()?.to_string(); + + let runtime = unsafe { RUNTIME.get_unchecked() }; + runtime.spawn(async move { + let result = do_stat(op, file).await; + complete_future(id, result.map(JValueOwned::Long)) + }); - let class = "java/util/concurrent/CompletableFuture"; - let f = env.new_object(class, "()V", &[])?; + Ok(id) +} + +async fn do_stat(op: &mut Operator, file: String) -> Result { + let metadata = op.stat(&file).await?; + Ok(Box::into_raw(Box::new(metadata)) as jlong) +} + +/// # Safety +/// +/// This function should not be called before the Operator are ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_Operator_read( + mut env: JNIEnv, + _: JClass, + op: *mut Operator, + file: JString, +) -> jlong { + intern_read(&mut env, op, file).unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +fn intern_read(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result { + let op = unsafe { &mut *op }; + let id = request_id(env)?; - // keep the future alive, so that we can complete it later - // but this approach will be limited by global ref table size (65535) - let future = env.new_global_ref(&f)?; + let file = env.get_string(&file)?.to_str()?.to_string(); let runtime = unsafe { RUNTIME.get_unchecked() }; runtime.spawn(async move { - let result = match op.write(&file, content).await { - Ok(()) => Ok(JObject::null()), - Err(err) => Err(Error::from(err)), - }; - complete_future(future.as_ref(), result) + let result = do_read(op, file).await; + complete_future(id, result.map(JValueOwned::Object)) }); - Ok(f.into_raw()) + Ok(id) +} + +async fn do_read<'local>(op: &mut Operator, file: String) -> Result> { + let content = op.read(&file).await?; + let content = String::from_utf8(content)?; + + let env = unsafe { get_current_env() }; + let result = env.new_string(content)?; + Ok(result.into()) +} + +fn request_id(env: &mut JNIEnv) -> Result { + let registry = env + .call_static_method( + "org/apache/opendal/Operator", + "registry", + "()Lorg/apache/opendal/Operator$AsyncRegistry;", + &[], + )? + .l()?; + Ok(env.call_method(registry, "requestId", "()J", &[])?.j()?) } -fn complete_future(future: &JObject, result: Result) { +fn make_object<'local>( + env: &mut JNIEnv<'local>, + value: JValueOwned<'local>, +) -> Result> { + let o = match value { + JValueOwned::Object(o) => o, + JValueOwned::Byte(_) => env.new_object("java/lang/Long", "(B)V", &[value.borrow()])?, + JValueOwned::Char(_) => env.new_object("java/lang/Char", "(C)V", &[value.borrow()])?, + JValueOwned::Short(_) => env.new_object("java/lang/Short", "(S)V", &[value.borrow()])?, + JValueOwned::Int(_) => env.new_object("java/lang/Integer", "(I)V", &[value.borrow()])?, + JValueOwned::Long(_) => env.new_object("java/lang/Long", "(J)V", &[value.borrow()])?, + JValueOwned::Bool(_) => env.new_object("java/lang/Boolean", "(Z)V", &[value.borrow()])?, + JValueOwned::Float(_) => env.new_object("java/lang/Float", "(F)V", &[value.borrow()])?, + JValueOwned::Double(_) => env.new_object("java/lang/Double", "(D)V", &[value.borrow()])?, + JValueOwned::Void => JObject::null(), + }; + Ok(o) +} + +fn complete_future(id: jlong, result: Result) { let mut env = unsafe { get_current_env() }; + let future = get_future(&mut env, id).unwrap(); match result { - Ok(result) => env - .call_method( + Ok(result) => { + let result = make_object(&mut env, result).unwrap(); + env.call_method( future, "complete", "(Ljava/lang/Object;)Z", &[JValue::Object(&result)], ) - .unwrap(), + .unwrap() + } Err(err) => { let exception = err.to_exception(&mut env).unwrap(); env.call_method( @@ -128,3 +233,22 @@ fn complete_future(future: &JObject, result: Result) { } }; } + +fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result> { + let registry = env + .call_static_method( + "org/apache/opendal/Operator", + "registry", + "()Lorg/apache/opendal/Operator$AsyncRegistry;", + &[], + )? + .l()?; + Ok(env + .call_method( + registry, + "get", + "(J)Ljava/util/concurrent/CompletableFuture;", + &[JValue::Long(id)], + )? + .l()?) +} diff --git a/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java b/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java index 640f66c15e08..a9e3c6e454cf 100644 --- a/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java @@ -26,42 +26,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class AsyncStepsTest { - Operator operator; + Operator op; @Given("A new OpenDAL Async Operator") public void a_new_open_dal_async_operator() { Map params = new HashMap<>(); params.put("root", "/tmp"); - operator = new Operator("Memory", params); + op = new Operator("Memory", params); } @When("Async write path {string} with content {string}") public void async_write_path_test_with_content_hello_world(String fileName, String content) { - CompletableFuture f = operator.write(fileName, content); - - f.join(); - - assertTrue(f.isDone()); - assertFalse(f.isCompletedExceptionally()); + op.write(fileName, content).join(); } @Then("The async file {string} should exist") public void the_async_file_test_should_exist(String fileName) { + Metadata metadata = op.stat(fileName).join(); + assertNotNull(metadata); } @Then("The async file {string} entry mode must be file") public void the_async_file_test_entry_mode_must_be_file(String fileName) { + Metadata metadata = op.stat(fileName).join(); + assertTrue(metadata.isFile()); } @Then("The async file {string} content length must be {int}") public void the_async_file_test_content_length_must_be_13(String fileName, int length) { + Metadata metadata = op.stat(fileName).join(); + assertEquals(metadata.getContentLength(), length); } @Then("The async file {string} must have content {string}") public void the_async_file_test_must_have_content_hello_world(String fileName, String content) { + String readContent = op.read(fileName).join(); + assertEquals(content, readContent); } } diff --git a/bindings/java/src/test/java/org/apache/opendal/StepsTest.java b/bindings/java/src/test/java/org/apache/opendal/StepsTest.java index ef26d1289929..d15f2dcc8652 100644 --- a/bindings/java/src/test/java/org/apache/opendal/StepsTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/StepsTest.java @@ -30,42 +30,42 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class StepsTest { - BlockingOperator blockingOperator; + BlockingOperator op; @Given("A new OpenDAL Blocking Operator") public void a_new_open_dal_blocking_operator() { Map params = new HashMap<>(); params.put("root", "/tmp"); - this.blockingOperator = new BlockingOperator("Memory", params); + op = new BlockingOperator("Memory", params); } @When("Blocking write path {string} with content {string}") public void blocking_write_path_test_with_content_hello_world(String fileName, String content) { - this.blockingOperator.write(fileName, content); + op.write(fileName, content); } @Then("The blocking file {string} should exist") public void the_blocking_file_test_should_exist(String fileName) { - Metadata metadata = this.blockingOperator.stat(fileName); + Metadata metadata = op.stat(fileName); assertNotNull(metadata); } @Then("The blocking file {string} entry mode must be file") public void the_blocking_file_test_entry_mode_must_be_file(String fileName) { - Metadata metadata = this.blockingOperator.stat(fileName); + Metadata metadata = op.stat(fileName); assertTrue(metadata.isFile()); } @Then("The blocking file {string} content length must be {int}") public void the_blocking_file_test_content_length_must_be_13(String fileName, int length) { - Metadata metadata = this.blockingOperator.stat(fileName); + Metadata metadata = op.stat(fileName); assertEquals(metadata.getContentLength(), length); } @Then("The blocking file {string} must have content {string}") public void the_blocking_file_test_must_have_content_hello_world(String fileName, String content) { - String readContent = this.blockingOperator.read(fileName); + String readContent = op.read(fileName); assertEquals(content, readContent); } }