Skip to content

feat(bindings/java): implement async ops to pass AsyncStepsTest #2291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ public Metadata stat(String fileName) {
protected native void disposeInternal(long handle);

private static native long constructor(String schema, Map<String, String> map);

private static native void write(long nativeHandle, String fileName, String content);

private static native String read(long nativeHandle, String fileName);

private static native void delete(long nativeHandle, String fileName);

private static native long stat(long nativeHandle, String file);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.opendal;

public class Metadata extends NativeObject {
public Metadata(long nativeHandle) {
protected Metadata(long nativeHandle) {
super(nativeHandle);
}

Expand Down
55 changes: 51 additions & 4 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,71 @@
package org.apache.opendal;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class Operator extends NativeObject {
private static AsyncRegistry registry() {
return AsyncRegistry.INSTANCE;
}

private enum AsyncRegistry {
INSTANCE;

private final Map<Long, CompletableFuture<?>> registry = new ConcurrentHashMap<>();

@SuppressWarnings("unused") // called by jni-rs
private long requestId() {
final CompletableFuture<?> f = new CompletableFuture<>();
while (true) {
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
final CompletableFuture<?> prev = registry.putIfAbsent(requestId, f);
if (prev == null) {
return requestId;
}
}
}

private CompletableFuture<?> get(long requestId) {
return registry.get(requestId);
}

@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> take(long requestId) {
final CompletableFuture<?> f = get(requestId);
if (f != null) {
f.whenComplete((r, e) -> registry.remove(requestId));
}
return (CompletableFuture<T>) f;
}
}

public Operator(String schema, Map<String, String> map) {
super(constructor(schema, map));
}


public CompletableFuture<Void> write(String fileName, String content) {
return write(nativeHandle, fileName, content);
final long requestId = write(nativeHandle, fileName, content);
return registry().take(requestId);
}

public CompletableFuture<Metadata> stat(String fileName) {
final long requestId = stat(nativeHandle, fileName);
final CompletableFuture<Long> f = registry().take(requestId);
return f.thenApply(Metadata::new);
}

public CompletableFuture<String> read(String fileName) {
final long requestId = read(nativeHandle, fileName);
return registry().take(requestId);
}

@Override
protected native void disposeInternal(long handle);

private static native long constructor(String schema, Map<String, String> map);

private static native CompletableFuture<Void> write(long nativeHandle, String fileName, String content);
private static native long read(long nativeHandle, String fileName);
private static native long write(long nativeHandle, String fileName, String content);
private static native long stat(long nativeHandle, String file);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class ODException extends RuntimeException {
private final Code code;

@SuppressWarnings("unused") // called by jni-rs
public ODException(String code, String message) {
this(Code.valueOf(code), message);
}
Expand Down
170 changes: 147 additions & 23 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

use std::str::FromStr;

use jni::objects::{JClass, JObject, JString, JValue};
use jni::sys::{jlong, jobject};
use jni::objects::{JClass, JObject, JString, JValue, JValueOwned};
use jni::sys::jlong;
use jni::JNIEnv;

use opendal::{Operator, Scheme};

use crate::error::Error;
use crate::{get_current_env, Result};
use crate::{jmap_to_hashmap, RUNTIME};

Expand Down Expand Up @@ -69,10 +68,10 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
op: *mut Operator,
file: JString,
content: JString,
) -> jobject {
) -> jlong {
intern_write(&mut env, op, file, content).unwrap_or_else(|e| {
e.throw(&mut env);
JObject::null().into_raw()
0
})
}

Expand All @@ -81,41 +80,147 @@ fn intern_write(
op: *mut Operator,
file: JString,
content: JString,
) -> Result<jobject> {
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let file = env.get_string(&file)?.to_str()?.to_string();
let content = env.get_string(&content)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_write(op, file, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});

Ok(id)
}

async fn do_write(op: &mut Operator, file: String, content: String) -> Result<()> {
Ok(op.write(&file, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_stat(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
file: JString,
) -> jlong {
intern_stat(&mut env, op, file).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_stat(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
let op = unsafe { &mut *op };
let file: String = env.get_string(&file)?.into();
let content: String = env.get_string(&content)?.into();
let id = request_id(env)?;

let file = env.get_string(&file)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_stat(op, file).await;
complete_future(id, result.map(JValueOwned::Long))
});

let class = "java/util/concurrent/CompletableFuture";
let f = env.new_object(class, "()V", &[])?;
Ok(id)
}

async fn do_stat(op: &mut Operator, file: String) -> Result<jlong> {
let metadata = op.stat(&file).await?;
Ok(Box::into_raw(Box::new(metadata)) as jlong)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_read(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
file: JString,
) -> jlong {
intern_read(&mut env, op, file).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_read(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

// keep the future alive, so that we can complete it later
// but this approach will be limited by global ref table size (65535)
let future = env.new_global_ref(&f)?;
let file = env.get_string(&file)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = match op.write(&file, content).await {
Ok(()) => Ok(JObject::null()),
Err(err) => Err(Error::from(err)),
};
complete_future(future.as_ref(), result)
let result = do_read(op, file).await;
complete_future(id, result.map(JValueOwned::Object))
});

Ok(f.into_raw())
Ok(id)
}

async fn do_read<'local>(op: &mut Operator, file: String) -> Result<JObject<'local>> {
let content = op.read(&file).await?;
let content = String::from_utf8(content)?;

let env = unsafe { get_current_env() };
let result = env.new_string(content)?;
Ok(result.into())
}

fn request_id(env: &mut JNIEnv) -> Result<jlong> {
let registry = env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
&[],
)?
.l()?;
Ok(env.call_method(registry, "requestId", "()J", &[])?.j()?)
}

fn complete_future(future: &JObject, result: Result<JObject>) {
fn make_object<'local>(
env: &mut JNIEnv<'local>,
value: JValueOwned<'local>,
) -> Result<JObject<'local>> {
let o = match value {
JValueOwned::Object(o) => o,
JValueOwned::Byte(_) => env.new_object("java/lang/Long", "(B)V", &[value.borrow()])?,
JValueOwned::Char(_) => env.new_object("java/lang/Char", "(C)V", &[value.borrow()])?,
JValueOwned::Short(_) => env.new_object("java/lang/Short", "(S)V", &[value.borrow()])?,
JValueOwned::Int(_) => env.new_object("java/lang/Integer", "(I)V", &[value.borrow()])?,
JValueOwned::Long(_) => env.new_object("java/lang/Long", "(J)V", &[value.borrow()])?,
JValueOwned::Bool(_) => env.new_object("java/lang/Boolean", "(Z)V", &[value.borrow()])?,
JValueOwned::Float(_) => env.new_object("java/lang/Float", "(F)V", &[value.borrow()])?,
JValueOwned::Double(_) => env.new_object("java/lang/Double", "(D)V", &[value.borrow()])?,
JValueOwned::Void => JObject::null(),
};
Ok(o)
}

fn complete_future(id: jlong, result: Result<JValueOwned>) {
let mut env = unsafe { get_current_env() };
let future = get_future(&mut env, id).unwrap();
match result {
Ok(result) => env
.call_method(
Ok(result) => {
let result = make_object(&mut env, result).unwrap();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&result)],
)
.unwrap(),
.unwrap()
}
Err(err) => {
let exception = err.to_exception(&mut env).unwrap();
env.call_method(
Expand All @@ -128,3 +233,22 @@ fn complete_future(future: &JObject, result: Result<JObject>) {
}
};
}

fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result<JObject<'local>> {
let registry = env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
&[],
)?
.l()?;
Ok(env
.call_method(
registry,
"get",
"(J)Ljava/util/concurrent/CompletableFuture;",
&[JValue::Long(id)],
)?
.l()?)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,47 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AsyncStepsTest {
Operator operator;
Operator op;

@Given("A new OpenDAL Async Operator")
public void a_new_open_dal_async_operator() {
Map<String, String> params = new HashMap<>();
params.put("root", "/tmp");
operator = new Operator("Memory", params);
op = new Operator("Memory", params);
}

@When("Async write path {string} with content {string}")
public void async_write_path_test_with_content_hello_world(String fileName, String content) {
CompletableFuture<Void> f = operator.write(fileName, content);

f.join();

assertTrue(f.isDone());
assertFalse(f.isCompletedExceptionally());
op.write(fileName, content).join();
}

@Then("The async file {string} should exist")
public void the_async_file_test_should_exist(String fileName) {
Metadata metadata = op.stat(fileName).join();
assertNotNull(metadata);
}

@Then("The async file {string} entry mode must be file")
public void the_async_file_test_entry_mode_must_be_file(String fileName) {
Metadata metadata = op.stat(fileName).join();
assertTrue(metadata.isFile());
}

@Then("The async file {string} content length must be {int}")
public void the_async_file_test_content_length_must_be_13(String fileName, int length) {
Metadata metadata = op.stat(fileName).join();
assertEquals(metadata.getContentLength(), length);
}

@Then("The async file {string} must have content {string}")
public void the_async_file_test_must_have_content_hello_world(String fileName, String content) {
String readContent = op.read(fileName).join();
assertEquals(content, readContent);
}
}
Loading