Skip to content

refactor: remove extension group name parameter and add new function to retrieve it #772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.FailedToConnectToRemote2"
"--gtest_filter=ExtensionTest.CommandInvalidExtensionGroup"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "include_internal/ten_runtime/common/loc.h"
#include "ten_runtime/msg/msg.h"
#include "ten_utils/container/list.h"
#include "ten_utils/lib/atomic.h"
#include "ten_utils/lib/signature.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/sanitizer/thread_check.h"
Expand Down Expand Up @@ -71,9 +70,18 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_context_on_close(
ten_extension_context_t *self);

TEN_RUNTIME_PRIVATE_API ten_extension_info_t *
ten_extension_context_get_extension_info_by_name(
ten_extension_context_t *self, const char *app_uri, const char *graph_id,
const char *extension_group_name, const char *extension_name);
ten_extension_context_get_extension_info_by_name(ten_extension_context_t *self,
const char *app_uri,
const char *graph_id,
const char *extension_name,
bool check_thread);

TEN_RUNTIME_PRIVATE_API bool ten_extension_context_start_extension_group(
ten_extension_context_t *self, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API const char *
ten_extension_context_get_extension_group_name(ten_extension_context_t *self,
const char *app_uri,
const char *graph_id,
const char *extension_name,
bool check_thread);
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "ten_utils/container/list.h"
#include "ten_utils/io/runloop.h"
#include "ten_utils/lib/signature.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/sanitizer/thread_check.h"

#define TEN_EXTENSION_GROUP_SIGNATURE 0x94F72EDA6137DF04U
Expand Down Expand Up @@ -124,10 +123,6 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_group_destroy_extensions(
TEN_RUNTIME_PRIVATE_API void ten_extension_group_set_addon(
ten_extension_group_t *self, ten_addon_host_t *addon_host);

TEN_RUNTIME_PRIVATE_API ten_shared_ptr_t *
ten_extension_group_create_cmd_result_for_invalid_dest(
ten_shared_ptr_t *origin_cmd, ten_string_t *target_group_name);

TEN_RUNTIME_PRIVATE_API ten_runloop_t *ten_extension_group_get_attached_runloop(
ten_extension_group_t *self);

Expand Down

This file was deleted.

20 changes: 15 additions & 5 deletions core/src/ten_manager/src/designer/template_pkgs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
use actix_web::{web, HttpResponse, Responder};
use anyhow::{anyhow, Result};
use semver::Version;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use strum_macros::{Display, EnumString};
Expand Down Expand Up @@ -44,9 +45,15 @@ pub struct GetTemplateRequestPayload {
pub language: TemplateLanguage,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TemplateInfo {
pub pkg_name: String,
pub pkg_version: Version,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct GetTemplateResponseData {
pub template_name: Vec<String>,
pub templates: Vec<TemplateInfo>,
}

pub async fn get_template_endpoint(
Expand Down Expand Up @@ -85,13 +92,16 @@ pub async fn get_template_endpoint(
match result {
Ok(packages) => {
// Extract the package names from the PkgRegistryInfo structs.
let template_names: Vec<String> = packages
let templates: Vec<TemplateInfo> = packages
.iter()
.map(|pkg| pkg.basic_info.type_and_name.name.clone())
.map(|pkg| TemplateInfo {
pkg_name: pkg.basic_info.type_and_name.name.clone(),
pkg_version: pkg.basic_info.version.clone(),
})
.collect();

// Handle case where no packages were found.
if template_names.is_empty() {
if templates.is_empty() {
let error_message = format!(
"Unsupported template combination: pkg_type={}, \
language={}",
Expand All @@ -109,7 +119,7 @@ pub async fn get_template_endpoint(

let response = ApiResponse {
status: Status::Ok,
data: GetTemplateResponseData { template_name: template_names },
data: GetTemplateResponseData { templates },
meta: None,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use actix_web::{http::StatusCode, test, web, App};

use ten_manager::{
config::{metadata::TmanMetadata, TmanConfig},
constants::{DEFAULT_APP_NODEJS, DEFAULT_EXTENSION_CPP},
designer::{
response::{ApiResponse, Status},
template_pkgs::{
Expand Down Expand Up @@ -58,7 +57,7 @@ async fn test_get_template_app_typescript() {
test::call_and_read_body_json(&app, req).await;

assert_eq!(resp.status, Status::Ok);
assert_eq!(resp.data.template_name, vec![DEFAULT_APP_NODEJS.to_string()]);
println!("{:?}", resp.data.templates);
}

#[actix_web::test]
Expand Down Expand Up @@ -96,10 +95,7 @@ async fn test_get_template_extension_cpp() {
test::call_and_read_body_json(&app, req).await;

assert_eq!(resp.status, Status::Ok);
assert_eq!(
resp.data.template_name,
vec![DEFAULT_EXTENSION_CPP.to_string()]
);
println!("{:?}", resp.data.templates);
}

#[actix_web::test]
Expand Down
18 changes: 9 additions & 9 deletions core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ void ten_app_do_connection_migration_or_push_to_engine_queue(
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: We are in the app thread, and all the uses of the engine in
// this function would not cause thread safety issues.
TEN_ASSERT(engine && ten_engine_check_integrity(engine, false),
"This function is called in the app thread.");
TEN_ASSERT(engine, "Should not happen.");
TEN_ASSERT(ten_engine_check_integrity(engine, false), "Should not happen.");

if (connection && ten_connection_needs_to_migrate(connection, engine)) {
ten_connection_migrate(connection, engine, msg);
Expand Down Expand Up @@ -317,8 +317,8 @@ static bool ten_app_handle_stop_graph_cmd(ten_app_t *self,
// to the engine.
ten_list_foreach (ten_msg_get_dest(cmd), iter) {
ten_loc_t *dest_loc = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
TEN_ASSERT(dest_loc, "Should not happen.");
TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");

ten_string_set_formatted(&dest_loc->graph_id, "%s",
ten_string_get_raw_str(&dest_engine->graph_id));
Expand Down Expand Up @@ -354,8 +354,8 @@ static bool ten_app_handle_cmd_result(ten_app_t *self,

#if defined(_DEBUG)
ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(cmd_result);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
TEN_ASSERT(dest_loc, "Should not happen.");
TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");

ten_string_t loc_str;
TEN_STRING_INIT(loc_str);
Expand Down Expand Up @@ -405,9 +405,9 @@ static bool ten_app_handle_cmd_result(ten_app_t *self,
bool ten_app_dispatch_msg(ten_app_t *self, ten_shared_ptr_t *msg,
ten_error_t *err) {
ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc) &&
ten_msg_get_dest_cnt(msg) == 1,
"Should not happen.");
TEN_ASSERT(dest_loc, "Should not happen.");
TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");
TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1, "Should not happen.");
TEN_ASSERT(!ten_string_is_empty(&dest_loc->app_uri),
"App URI should not be empty.");

Expand Down
5 changes: 3 additions & 2 deletions core/src/ten_runtime/connection/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ void ten_connection_upgrade_migration_state_to_done(ten_connection_t *self,
if (engine) {
// The message is sent to the app, not an engine.

TEN_ASSERT(engine && ten_engine_check_integrity(engine, true),
"Access across threads.");
TEN_ASSERT(engine, "Invalid argument.");
TEN_ASSERT(ten_engine_check_integrity(engine, true),
"Invalid use of engine %p.", engine);

// @{
// Attach to engine.
Expand Down
4 changes: 2 additions & 2 deletions core/src/ten_runtime/engine/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ static void ten_engine_set_graph_id(ten_engine_t *self, ten_shared_ptr_t *cmd) {
// Set the newly created graph_id to the 'start_graph' command.
ten_list_foreach (ten_msg_get_dest(cmd), iter) {
ten_loc_t *dest_loc = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
TEN_ASSERT(dest_loc, "Should not happen.");
TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");

ten_string_set_formatted(&dest_loc->graph_id, "%s",
ten_string_get_raw_str(&graph_id_str));
Expand Down
8 changes: 3 additions & 5 deletions core/src/ten_runtime/engine/internal/extension_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ static void ten_engine_on_all_extension_threads_are_ready(

if (error_occurred) {
ten_app_t *app = self->app;
TEN_ASSERT(app && ten_app_check_integrity(app, false),
"Invalid argument.");
TEN_ASSERT(app, "Should not happen.");
TEN_ASSERT(ten_app_check_integrity(app, false), "Should not happen.");

// This graph/engine will not be functioning properly, so it will be shut
// down directly.
Expand Down Expand Up @@ -236,9 +236,7 @@ void ten_engine_find_extension_info_for_all_extensions_of_extension_thread_task(
ten_extension_context_get_extension_info_by_name(
extension_context, ten_app_get_uri(extension_context->engine->app),
ten_engine_get_id(extension_context->engine, true),
ten_extension_group_get_name(extension_thread->extension_group,
false),
ten_extension_get_name(extension, false));
ten_extension_get_name(extension, false), true);
}

if (extension_thread->is_close_triggered) {
Expand Down
96 changes: 63 additions & 33 deletions core/src/ten_runtime/engine/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_runtime/msg/msg_info.h"
#include "include_internal/ten_runtime/remote/remote.h"
#include "include_internal/ten_utils/value/value.h"
#include "ten_runtime/app/app.h"
#include "ten_runtime/msg/cmd_result/cmd_result.h"
#include "ten_runtime/msg/msg.h"
Expand Down Expand Up @@ -363,18 +364,37 @@ static void ten_engine_post_msg_to_extension_thread(
}
}

ten_shared_ptr_t *ten_engine_create_cmd_result_for_invalid_dest(
ten_shared_ptr_t *origin_cmd) {
TEN_ASSERT(origin_cmd, "Should not happen.");

if (!ten_msg_is_cmd_and_result(origin_cmd)) {
ten_msg_dump(origin_cmd, NULL, "Unexpected message: ^m");
TEN_ASSERT(0, "Should not happen.");
}

ten_shared_ptr_t *cmd_result =
ten_cmd_result_create_from_cmd(TEN_STATUS_CODE_ERROR, origin_cmd);
ten_msg_set_property(cmd_result, TEN_STR_DETAIL,
ten_value_create_vstring("Failed to find destination."),
NULL);

return cmd_result;
}

bool ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
TEN_ASSERT(self, "Should not happen.");
TEN_ASSERT(ten_engine_check_integrity(self, true), "Should not happen.");

TEN_ASSERT(msg, "Should not happen.");
TEN_ASSERT(ten_msg_check_integrity(msg), "Should not happen.");
TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1,
"When this function is executed, there should be only one "
"destination remaining in the message's dest.");

ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");
TEN_ASSERT(dest_loc, "Should not happen.");
TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");

ten_app_t *app = self->app;
TEN_ASSERT(app, "Invalid argument.");
Expand Down Expand Up @@ -409,7 +429,7 @@ bool ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
// current TEN app.
ten_app_push_to_in_msgs_queue(app, msg);
} else {
if (ten_string_is_empty(&dest_loc->extension_group_name)) {
if (ten_string_is_empty(&dest_loc->extension_name)) {
// It means the destination is the current engine, so ask the current
// engine to handle this message.
ten_engine_handle_msg(self, msg);
Expand All @@ -419,34 +439,45 @@ bool ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
if (self->extension_context) {
bool found = false;

ten_list_foreach (&self->extension_context->extension_threads, iter) {
ten_extension_thread_t *extension_thread =
ten_ptr_listnode_get(iter.node);
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: We are in the engine thread, _not_ in the
// extension thread. However, before the engine is closed,
// the pointer of the extension group and the pointer of the
// extension thread will not be changed, and the closing of
// the entire engine must start from the engine, so the
// execution to this position means that the engine has not
// been closed, so there will be no thread safety issue.
ten_extension_thread_check_integrity(extension_thread,
false),
"Should not happen.");

ten_extension_group_t *extension_group =
extension_thread->extension_group;

if (ten_string_is_equal(&extension_group->name,
&dest_loc->extension_group_name)) {
// Find the correct extension thread, ask it to handle the
// message.
found = true;
ten_engine_post_msg_to_extension_thread(self, extension_thread,
msg);
break;
const char *extension_group_name =
ten_extension_context_get_extension_group_name(
self->extension_context,
ten_string_get_raw_str(&dest_loc->app_uri),
ten_string_get_raw_str(&dest_loc->graph_id),
ten_string_get_raw_str(&dest_loc->extension_name), true);

if (extension_group_name) {
ten_list_foreach (&self->extension_context->extension_threads,
iter) {
ten_extension_thread_t *extension_thread =
ten_ptr_listnode_get(iter.node);
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: We are in the engine thread, _not_ in the
// extension thread. However, before the engine is closed,
// the pointer of the extension group and the pointer of
// the extension thread will not be changed, and the
// closing of the entire engine must start from the
// engine, so the execution to this position means that
// the engine has not been closed, so there will be no
// thread safety issue.
ten_extension_thread_check_integrity(extension_thread,
false),
"Should not happen.");

ten_extension_group_t *extension_group =
extension_thread->extension_group;

if (ten_string_is_equal_c_str(&extension_group->name,
extension_group_name)) {
// Find the correct extension thread, ask it to handle the
// message.
found = true;
ten_engine_post_msg_to_extension_thread(self, extension_thread,
msg);
break;
}
}
}

Expand All @@ -457,8 +488,7 @@ bool ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {

if (ten_msg_is_cmd(msg)) {
ten_shared_ptr_t *cmd_result =
ten_extension_group_create_cmd_result_for_invalid_dest(
msg, &dest_loc->extension_group_name);
ten_engine_create_cmd_result_for_invalid_dest(msg);

ten_engine_dispatch_msg(self, cmd_result);

Expand Down
Loading
Loading