Skip to content

refactor: remove extension group name from message handling functions and update related tests #774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.CommandInvalidExtensionGroup"
"--gtest_filter=BasicTest.ThrowExceptionInExtension"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
2 changes: 1 addition & 1 deletion core/include/ten_runtime/binding/cpp/detail/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class extension_t : public binding_handle_t {
TEN_ASSERT(stop_graph_cmd, "Should not happen.");

ten_msg_clear_and_set_dest(stop_graph_cmd, "localhost", nullptr, nullptr,
nullptr, nullptr);
nullptr);
ten_env_send_cmd(ten_env.get_c_ten_env(), stop_graph_cmd, nullptr, nullptr,
nullptr, nullptr);
ten_shared_ptr_destroy(stop_graph_cmd);
Expand Down
5 changes: 2 additions & 3 deletions core/include/ten_runtime/binding/cpp/detail/msg/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class msg_t {
return ten_msg_get_name(c_msg);
}

bool set_dest(const char *uri, const char *graph,
const char *extension_group_name, const char *extension_name,
bool set_dest(const char *uri, const char *graph, const char *extension_name,
error_t *err = nullptr) const {
TEN_ASSERT(c_msg, "Should not happen.");

Expand All @@ -70,7 +69,7 @@ class msg_t {
}

return ten_msg_clear_and_set_dest(
c_msg, uri, graph, extension_group_name, extension_name,
c_msg, uri, graph, extension_name,
err != nullptr ? err->get_c_error() : nullptr);
}

Expand Down
9 changes: 5 additions & 4 deletions core/include/ten_runtime/msg/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ TEN_RUNTIME_API ten_value_t *ten_msg_peek_property(ten_shared_ptr_t *self,
const char *path,
ten_error_t *err);

TEN_RUNTIME_API bool ten_msg_clear_and_set_dest(
ten_shared_ptr_t *self, const char *app_uri, const char *graph_id,
const char *extension_group_name, const char *extension_name,
ten_error_t *err);
TEN_RUNTIME_API bool ten_msg_clear_and_set_dest(ten_shared_ptr_t *self,
const char *app_uri,
const char *graph_id,
const char *extension_name,
ten_error_t *err);

TEN_RUNTIME_API bool ten_msg_from_json(ten_shared_ptr_t *self, ten_json_t *json,
ten_error_t *err);
Expand Down
7 changes: 4 additions & 3 deletions core/include_internal/ten_runtime/msg/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ TEN_RUNTIME_PRIVATE_API void ten_msg_set_src_to_extension_group(
TEN_RUNTIME_PRIVATE_API void ten_msg_clear_and_set_dest_from_msg_src(
ten_shared_ptr_t *self, ten_shared_ptr_t *cmd);

TEN_RUNTIME_PRIVATE_API void ten_raw_msg_add_dest(
ten_msg_t *self, const char *app_uri, const char *graph_id,
const char *extension_group_name, const char *extension_name);
TEN_RUNTIME_PRIVATE_API void ten_raw_msg_add_dest(ten_msg_t *self,
const char *app_uri,
const char *graph_id,
const char *extension_name);

TEN_RUNTIME_PRIVATE_API void ten_raw_msg_clear_dest(ten_msg_t *self);

Expand Down
8 changes: 7 additions & 1 deletion core/src/ten_manager/src/fs/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ async fn watch_file_task(
loop {
// Wait for stop or the next check interval.
tokio::select! {
_ = &mut stop_rx => break,
_ = &mut stop_rx => {
eprintln!("Stopping file watcher");
break;
},
_ = tokio::time::sleep(options.check_interval) => {
// Check if the file has been rotated or truncated.
match std::fs::metadata(&path) {
Expand Down Expand Up @@ -217,6 +220,7 @@ async fn watch_file_task(
if curr_len > last_pos {
// Read the new part.
if let Err(e) = file.seek(SeekFrom::Start(last_pos)) {
eprintln!("Error seeking to position: {}", e);
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
break;
}
Expand All @@ -231,13 +235,15 @@ async fn watch_file_task(
}
Ok(_) => {}
Err(e) => {
eprintln!("Error reading from file: {}", e);
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
break;
}
}
} else {
// Reached EOF, check if the timeout has been reached.
if Instant::now().duration_since(last_activity) > options.timeout {
eprintln!("Timeout reached, breaking");
break;
}
}
Expand Down
26 changes: 17 additions & 9 deletions core/src/ten_manager/tests/test_case/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,40 @@ mod tests {
temp_file.write_all(test_content)?;
temp_file.flush()?;

// Create options with shorter timeout for testing
// Create options with shorter timeout for testing.
let options = FileWatchOptions {
timeout: Duration::from_secs(5),
buffer_size: 1024,
check_interval: Duration::from_millis(100),
};

// Start watching the file
// Start watching the file.
let mut stream =
watch_file(temp_file.path(), Some(options)).await?;

// Get the first chunk
// Get the first chunk.
let chunk = stream.next().await.expect("Should receive data")?;
assert_eq!(chunk, test_content);

// Write more content to the file
// Write more content to the file.
let more_content = b"More content!";
temp_file.write_all(more_content)?;
temp_file.flush()?;

// Get the second chunk
let chunk =
stream.next().await.expect("Should receive more data")?;
assert_eq!(chunk, more_content);
// Get the second chunk.
let chunk = stream.next().await;
match chunk {
Some(chunk) => match chunk {
Ok(chunk) => assert_eq!(chunk, more_content),
Err(e) => panic!("Should receive more data: {}", e),
},
None => {
panic!("Should receive more data");
}
}

// Stop watching
// Stop watching.
println!("Stopping stream");
stream.stop();

Ok(())
Expand Down
24 changes: 17 additions & 7 deletions core/src/ten_runtime/addon/addon.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,24 @@ static void ten_app_create_addon_instance(ten_app_t *app,
ten_string_get_raw_str(&addon_context->instance_name);
TEN_ASSERT(instance_name, "Should not happen.");

if (ten_c_string_is_empty(addon_name) ||
ten_c_string_is_empty(instance_name)) {
TEN_LOGI(
"The addon name or instance name is empty, will not create the addon "
"instance.");
if (addon_context->flow ==
TEN_ADDON_CONTEXT_FLOW_ENGINE_CREATE_EXTENSION_GROUP) {
if (ten_c_string_is_empty(addon_name)) {
TEN_LOGI("The addon name is empty, will not create the extension group.");

ten_app_notify_create_addon_instance_failed(app, addon_context);
return;
ten_app_notify_create_addon_instance_failed(app, addon_context);
return;
}
} else {
if (ten_c_string_is_empty(addon_name) ||
ten_c_string_is_empty(instance_name)) {
TEN_LOGI(
"The addon name or instance name is empty, will not create the addon "
"instance.");

ten_app_notify_create_addon_instance_failed(app, addon_context);
return;
}
}

TEN_LOGD("Try to find addon for %s", addon_name);
Expand Down
8 changes: 4 additions & 4 deletions core/src/ten_runtime/app/predefined_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ ten_app_build_start_graph_cmd_to_start_predefined_graph(
ten_shared_ptr_t *start_graph_cmd = ten_cmd_start_graph_create();
TEN_ASSERT(start_graph_cmd, "Should not happen.");

ten_msg_clear_and_set_dest(start_graph_cmd, app_uri, NULL, NULL, NULL, err);
ten_msg_clear_and_set_dest(start_graph_cmd, app_uri, NULL, NULL, err);

void *json_ctx = ten_json_create_new_ctx();
ten_json_t start_graph_cmd_json = TEN_JSON_INIT_VAL(json_ctx, true);
ten_json_init_object(&start_graph_cmd_json);

ten_json_t ten_json = TEN_JSON_INIT_VAL(json_ctx, false);
bool success = ten_json_object_peek_or_create_object(
&start_graph_cmd_json, TEN_STR_TEN, &ten_json);
bool success = ten_json_object_peek_or_create_object(&start_graph_cmd_json,
TEN_STR_TEN, &ten_json);
TEN_ASSERT(success, "Should not happen.");

ten_json_t nodes_json = TEN_JSON_INIT_VAL(json_ctx, false);
Expand Down Expand Up @@ -193,7 +193,7 @@ static void ten_app_start_auto_start_predefined_graph_result_handler(

ten_shared_ptr_t *close_app_cmd = ten_cmd_close_app_create();
ten_msg_clear_and_set_dest(close_app_cmd, ten_string_get_raw_str(&app->uri),
NULL, NULL, NULL, err);
NULL, NULL, err);
ten_env_send_cmd(ten_env, close_app_cmd, NULL, NULL, NULL, err);
}
}
Expand Down
20 changes: 8 additions & 12 deletions core/src/ten_runtime/binding/go/interface/ten_runtime/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ type Msg interface {

GetName() (string, error)
SetDest(
appUri string,
graphId string,
extensionGroup string,
appURI string,
graphID string,
extension string,
) error

Expand Down Expand Up @@ -205,22 +204,19 @@ func (p *msg) GetName() (string, error) {
}

func (p *msg) SetDest(
appUri string,
graphId string,
extensionGroup string,
appURI string,
graphID string,
extension string,
) error {
defer p.keepAlive()

err := withCGOLimiter(func() error {
apiStatus := C.ten_go_msg_set_dest(
p.cPtr,
unsafe.Pointer(unsafe.StringData(appUri)),
C.int(len(appUri)),
unsafe.Pointer(unsafe.StringData(graphId)),
C.int(len(graphId)),
unsafe.Pointer(unsafe.StringData(extensionGroup)),
C.int(len(extensionGroup)),
unsafe.Pointer(unsafe.StringData(appURI)),
C.int(len(appURI)),
unsafe.Pointer(unsafe.StringData(graphID)),
C.int(len(graphID)),
unsafe.Pointer(unsafe.StringData(extension)),
C.int(len(extension)),
)
Expand Down
6 changes: 2 additions & 4 deletions core/src/ten_runtime/binding/go/interface/ten_runtime/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,5 @@ ten_go_error_t ten_go_msg_get_name(uintptr_t bridge_addr, const char **name);

ten_go_error_t ten_go_msg_set_dest(uintptr_t bridge_addr, const void *app_uri,
int app_uri_len, const void *graph_id,
int graph_id_len,
const void *extension_group,
int extension_group_len,
const void *extension, int extension_len);
int graph_id_len, const void *extension,
int extension_len);
12 changes: 2 additions & 10 deletions core/src/ten_runtime/binding/go/native/msg/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,8 @@ ten_go_error_t ten_go_msg_get_name(uintptr_t bridge_addr, const char **name) {

ten_go_error_t ten_go_msg_set_dest(uintptr_t bridge_addr, const void *app_uri,
int app_uri_len, const void *graph_id,
int graph_id_len,
const void *extension_group,
int extension_group_len,
const void *extension, int extension_len) {
int graph_id_len, const void *extension,
int extension_len) {
ten_go_msg_t *self = ten_go_msg_reinterpret(bridge_addr);
TEN_ASSERT(self && ten_go_msg_check_integrity(self), "Should not happen.");

Expand All @@ -832,10 +830,6 @@ ten_go_error_t ten_go_msg_set_dest(uintptr_t bridge_addr, const void *app_uri,
ten_string_t graph_id_str;
ten_string_init_from_c_str_with_size(&graph_id_str, graph_id, graph_id_len);

ten_string_t extension_group_str;
ten_string_init_from_c_str_with_size(&extension_group_str, extension_group,
extension_group_len);

ten_string_t extension_str;
ten_string_init_from_c_str_with_size(&extension_str, extension,
extension_len);
Expand All @@ -846,7 +840,6 @@ ten_go_error_t ten_go_msg_set_dest(uintptr_t bridge_addr, const void *app_uri,
bool rc = ten_msg_clear_and_set_dest(
ten_go_msg_c_msg(self), ten_string_get_raw_str(&app_uri_str),
ten_string_get_raw_str(&graph_id_str),
ten_string_get_raw_str(&extension_group_str),
ten_string_get_raw_str(&extension_str), &err);

if (!rc) {
Expand All @@ -857,7 +850,6 @@ ten_go_error_t ten_go_msg_set_dest(uintptr_t bridge_addr, const void *app_uri,
ten_error_deinit(&err);
ten_string_deinit(&app_uri_str);
ten_string_deinit(&graph_id_str);
ten_string_deinit(&extension_group_str);
ten_string_deinit(&extension_str);

return cgo_error;
Expand Down
2 changes: 0 additions & 2 deletions core/src/ten_runtime/binding/nodejs/interface/msg/msg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ export class Msg {
setDest(
appUri: string | undefined = undefined,
graphId: string | undefined = undefined,
extensionGroup: string | undefined = undefined,
extension: string | undefined = undefined,
) {
ten_addon.ten_nodejs_msg_set_dest(
this,
appUri,
graphId,
extensionGroup,
extension,
);
}
Expand Down
20 changes: 5 additions & 15 deletions core/src/ten_runtime/binding/nodejs/native/msg/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ static napi_value ten_nodejs_msg_get_name(napi_env env,

static napi_value ten_nodejs_msg_set_dest(napi_env env,
napi_callback_info info) {
const size_t argc = 5;
napi_value args[argc]; // this, app_uri, graph_id, extension_group, extension
const size_t argc = 4;
napi_value args[argc]; // this, app_uri, graph_id, extension
if (!ten_nodejs_get_js_func_args(env, info, args, argc)) {
napi_fatal_error(NULL, NAPI_AUTO_LENGTH,
"Incorrect number of parameters passed.",
Expand All @@ -98,9 +98,6 @@ static napi_value ten_nodejs_msg_set_dest(napi_env env,
ten_string_t graph_id;
TEN_STRING_INIT(graph_id);

ten_string_t extension_group;
TEN_STRING_INIT(extension_group);

ten_string_t extension;
TEN_STRING_INIT(extension);

Expand All @@ -115,20 +112,14 @@ static napi_value ten_nodejs_msg_set_dest(napi_env env,
}

if (!is_js_undefined(env, args[3])) {
bool rc = ten_nodejs_get_str_from_js(env, args[3], &extension_group);
RETURN_UNDEFINED_IF_NAPI_FAIL(rc, "Failed to get extension group", NULL);
}

if (!is_js_undefined(env, args[4])) {
bool rc = ten_nodejs_get_str_from_js(env, args[4], &extension);
bool rc = ten_nodejs_get_str_from_js(env, args[3], &extension);
RETURN_UNDEFINED_IF_NAPI_FAIL(rc, "Failed to get extension", NULL);
}

bool rc = ten_msg_clear_and_set_dest(
msg_bridge->msg, ten_string_get_raw_str(&app_uri),
ten_string_get_raw_str(&graph_id),
ten_string_get_raw_str(&extension_group),
ten_string_get_raw_str(&extension), &err);
ten_string_get_raw_str(&graph_id), ten_string_get_raw_str(&extension),
&err);
if (!rc) {
ten_string_t code_str;
ten_string_init_formatted(&code_str, "%d", ten_error_code(&err));
Expand All @@ -141,7 +132,6 @@ static napi_value ten_nodejs_msg_set_dest(napi_env env,

ten_string_deinit(&app_uri);
ten_string_deinit(&graph_id);
ten_string_deinit(&extension_group);
ten_string_deinit(&extension);
ten_error_deinit(&err);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class _Msg:
self,
app_uri: Optional[str],
graph_id: Optional[str],
extension_group: Optional[str],
extension: Optional[str],
) -> None: ...
def set_property_from_json(
Expand Down
Loading
Loading