Skip to content

Commit 37c14d5

Browse files
committed
Merge branch 'dev'
2 parents 6053b87 + b3056f6 commit 37c14d5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+649
-179
lines changed

.cargo/config.toml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[build]
2+
rustflags = ["--cfg", "tokio_unstable"]

.env

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
DATABASE_URL=postgres://postgres:[email protected].40.3:5432/data_process_web
2-
HOST=127.0.0.1
1+
DATABASE_URL=mysql://root:[email protected].10.74:3306/data_process_web
2+
HOST=0.0.0.0
33
PORT=18000
4-
CACHE_DATABASE_URL=postgres://postgres:[email protected].40.3:5432/data_process_cache
4+
CACHE_DATABASE_URL=mysql://root:[email protected].10.74:3306/data_process_cache
55
JWT_SECRET=data_process
66
LOG_LEVEL=DEBUG

.zed/settings.json

Whitespace-only changes.

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/process_core/src/http.rs

+48-17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use reqwest::{
77
header::{self, HeaderName, HeaderValue},
88
Method,
99
};
10+
use sea_orm::{DbBackend, Statement};
1011
use serde::{Deserialize, Serialize};
1112
use serde_json::Value;
1213
use tracing::{debug, error};
@@ -156,8 +157,6 @@ impl Serde for Http {
156157
) {
157158
Err(_) => {}
158159
Ok(x) => {
159-
let a = x.to_string();
160-
println!("a {a}");
161160
self.data = x;
162161
}
163162
}
@@ -206,40 +205,72 @@ pub fn generate_sql_list(template_sql: &str, data: &Value) -> Result<Vec<String>
206205
}
207206

208207
let mut key_vec = vec![];
208+
let mut rel_key_vec = vec![];
209209
let mut i = 0;
210+
let char_size1 = "$".as_bytes().len();
211+
let char_size2 = "'".as_bytes().len();
212+
let char_size3 = "}".as_bytes().len();
213+
210214
while i < temp_index_vec.len() {
211-
let one_index = temp_index_vec[i].0 - "$".as_bytes().len(); // 取"{"前$的字节索引
212-
let two_index = temp_index_vec[i + 1].0;
215+
rel_key_vec
216+
.push(template_sql[temp_index_vec[i].0 + 1..temp_index_vec[i + 1].0].to_string());
217+
218+
let mut one_index = temp_index_vec[i].0 - char_size1; // 取"{"前$的字节索引
219+
let mut two_index = temp_index_vec[i + 1].0;
220+
221+
let pre_one = &template_sql[(one_index - char_size2)..one_index];
222+
let post_two =
223+
&template_sql[(two_index + char_size3)..(two_index + char_size3 + char_size2)];
213224

214-
key_vec.push(template_sql[one_index..two_index + "}".as_bytes().len()].to_string());
225+
if pre_one == "'" && post_two == "'" {
226+
one_index -= char_size1;
227+
two_index += char_size2;
228+
}
229+
230+
key_vec.push(template_sql[one_index..two_index + char_size3].to_string());
215231

216232
i += 2;
217233
}
234+
let mut template_sql = template_sql.to_string();
218235

219-
let mut result_vec: Vec<String> = vec![];
236+
for item in &key_vec {
237+
// template_sql = template_sql.replace(item, format!("${}", index + 1).as_str());
238+
template_sql = template_sql.replace(item, "?");
239+
}
220240

221-
for key in key_vec {
222-
let rel_key = &key[2..key.len() - 1];
223-
let value = find_value(rel_key, data, true)
224-
.map_err(|err| anyhow!("{err} 未在rel_key: {rel_key} data:{}中找到数据", data))?;
241+
let mut result_vec: Vec<String> = vec![];
242+
let mut result_values: Vec<Vec<String>> = vec![];
243+
for key in &rel_key_vec {
244+
// let rel_key = &key[2..key.len() - 1];
245+
let value = find_value(key, data, true)
246+
.map_err(|err| anyhow!("{err} 未在rel_key: {key} data:{}中找到数据", data))?;
225247
if let Some(list) = value.as_array() {
226248
for i in 0..list.len() {
227-
let item: &str;
249+
let mut item;
228250
let temp_string = list[i].to_string();
229251
if let Some(x) = list[i].as_str() {
230-
item = x;
252+
item = x.to_string();
231253
} else {
232-
item = temp_string.as_str();
254+
item = temp_string;
233255
}
234-
235-
if result_vec.get(i).is_none() {
236-
result_vec.push(template_sql.replace(key.as_str(), item));
256+
// TIPS 做这个转换是为了防止值影响以;来切割SQL语句的方法 查看:crates/process_web/src/service/collect_config_service.rs 219行
257+
item = item.replace(';', r#"\:"#);
258+
if result_values.get(i).is_none() {
259+
result_values.push(vec![item.to_string()]);
237260
} else {
238-
result_vec[i] = result_vec[i].replace(key.as_str(), item);
261+
result_values[i].push(item.to_string());
239262
}
240263
}
241264
}
242265
}
266+
for values in result_values {
267+
let sql = Statement::from_sql_and_values(
268+
DbBackend::MySql,
269+
template_sql.clone(),
270+
values.iter().map(|x| x.into()),
271+
);
272+
result_vec.push(sql.to_string());
273+
}
243274

244275
Ok(result_vec)
245276
}

crates/process_web/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ sea-orm = { version = "0.12", features = [ "sqlx-postgres", "sqlx-mysql", "runti
1010
anyhow = "1.0.75"
1111
dotenvy = "0.15.7"
1212
tokio = { version = "1.34.0", features = ["full"] }
13+
tokio-util = "0.7.10"
1314
tracing = "0.1"
1415
tracing-subscriber = "0.3.0"
1516
tracing-appender = "0.2"
+6-4
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,34 @@
11
pub use sea_orm_migration::prelude::*;
22

3-
mod m20240119_000002_create_collect_log_table;
4-
mod m20240119_030002_create_sync_log_table;
53
mod m20240119_000001_create_collect_config_table;
4+
mod m20240119_000002_create_collect_log_table;
65
mod m20240119_023953_create_sync_config_table;
6+
mod m20240119_030002_create_sync_log_table;
77
mod m20240226_015923_create_data_source_list;
88
mod m20240227_022320_create_data_sharing_config_table;
99
mod m20240304_063328_create_sharing_request_log;
1010
mod m20240319_061549_update_data_sharing_config_table;
1111
mod m20240327_063820_update_sharing_request_log;
1212
mod m20240402_033637_update_collect_config_table;
13+
mod m20240408_033448_update_collect_log_table;
1314

1415
pub struct Migrator;
1516

1617
#[async_trait::async_trait]
1718
impl MigratorTrait for Migrator {
1819
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
1920
vec![
20-
Box::new(m20240119_000001_create_collect_config_table::Migration),
2121
Box::new(m20240119_000002_create_collect_log_table::Migration),
22-
Box::new(m20240119_023953_create_sync_config_table::Migration),
2322
Box::new(m20240119_030002_create_sync_log_table::Migration),
23+
Box::new(m20240119_000001_create_collect_config_table::Migration),
24+
Box::new(m20240119_023953_create_sync_config_table::Migration),
2425
Box::new(m20240226_015923_create_data_source_list::Migration),
2526
Box::new(m20240227_022320_create_data_sharing_config_table::Migration),
2627
Box::new(m20240304_063328_create_sharing_request_log::Migration),
2728
Box::new(m20240319_061549_update_data_sharing_config_table::Migration),
2829
Box::new(m20240327_063820_update_sharing_request_log::Migration),
2930
Box::new(m20240402_033637_update_collect_config_table::Migration),
31+
Box::new(m20240408_033448_update_collect_log_table::Migration),
3032
]
3133
}
3234
}

crates/process_web/migration/src/m20240119_000002_create_collect_log_table.rs

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub enum CollectLog {
7777
Id,
7878
RunningLog,
7979
CollectConfigId,
80+
TaskId,
8081
Status,
8182
UpdateTime,
8283
CreateTime,

crates/process_web/migration/src/m20240402_033637_update_collect_config_table.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ pub struct Migration;
88
#[async_trait::async_trait]
99
impl MigrationTrait for Migration {
1010
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
11-
1211
manager
1312
.alter_table(
1413
Table::alter()
@@ -18,9 +17,8 @@ impl MigrationTrait for Migration {
1817
.json()
1918
.comment(r#"数据库列配置2"#),
2019
)
21-
.to_owned()
20+
.to_owned(),
2221
)
2322
.await
2423
}
25-
2624
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use sea_orm_migration::prelude::*;
2+
3+
use crate::m20240119_000002_create_collect_log_table::CollectLog;
4+
5+
#[derive(DeriveMigrationName)]
6+
pub struct Migration;
7+
8+
#[async_trait::async_trait]
9+
impl MigrationTrait for Migration {
10+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
11+
manager
12+
.alter_table(
13+
Table::alter()
14+
.table(CollectLog::Table)
15+
.add_column(
16+
ColumnDef::new(CollectLog::TaskId)
17+
.string()
18+
.comment(r#"正在执行的任务的id"#),
19+
)
20+
.to_owned(),
21+
)
22+
.await
23+
}
24+
}

crates/process_web/src/api/collect_config.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ use axum::{
77
Json, Router,
88
};
99
use serde::Deserialize;
10+
use tracing::debug;
1011
use ts_rs::TS;
12+
use uuid::Uuid;
1113

12-
use crate::api::common::{AppError, AppState, PaginationPayload, ResJson, ResJsonWithPagination};
14+
use crate::api::common::{
15+
AppError, AppState, LogTask, PaginationPayload, ResJson, ResJsonWithPagination,
16+
};
1317
use crate::entity::collect_config::Model;
1418
use crate::service::collect_config_service::CollectConfigService;
15-
use crate::{bool_response, data_response, pagination_response, res_template_ok};
19+
use crate::{bool_response, data_response, pagination_response};
1620

1721
pub fn set_routes() -> Router<Arc<AppState>> {
1822
Router::new()
@@ -57,6 +61,7 @@ async fn list(
5761

5862
pagination_response!(res, payload.current, payload.page_size)
5963
}
64+
6065
async fn add(
6166
state: State<Arc<AppState>>,
6267
Json(payload): Json<Model>,
@@ -89,11 +94,23 @@ pub async fn execute(
8994
) -> Result<ResJson<bool>, AppError> {
9095
let data = CollectConfigService::find_by_id(&state.conn, id).await?;
9196

97+
let task_id = Uuid::new_v4().simple();
98+
let st = state.clone();
99+
let log_task = LogTask::new();
100+
let cloned_token = log_task.token.clone();
101+
let mut task = state.log_task.write().await;
102+
task.insert(task_id, log_task);
103+
drop(task);
104+
92105
tokio::task::spawn(async move {
93-
CollectConfigService::execute_task(&state, &data).await;
106+
tokio::select! {
107+
_ = cloned_token.cancelled() => {
108+
// The token was cancelled, task can shut down
109+
debug!("cloned_token {cloned_token:?}");
110+
}
111+
_ = CollectConfigService::execute_task(&st, &data, task_id) => {}
112+
}
94113
});
95-
// https://docs.rs/tokio/1.35.1/tokio/task/index.html#yield_now
96-
// tokio::task::yield_now().await;
97-
98-
Ok(Json(res_template_ok!(Some(true))))
114+
let res: anyhow::Result<bool> = Ok(true);
115+
bool_response!(res)
99116
}

crates/process_web/src/api/collect_log.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ use axum::{
77
};
88
use serde::Deserialize;
99
use std::sync::Arc;
10+
use tracing::error;
1011
use ts_rs::TS;
12+
use uuid::Uuid;
1113

1214
use crate::api::common::{AppError, AppState, PaginationPayload, ResJson, ResJsonWithPagination};
13-
use crate::entity::collect_log::Model;
15+
use crate::entity::collect_log::{self, Model};
1416
use crate::service::collect_log_service::CollectLogService;
1517
use crate::{bool_response, data_response, pagination_response};
1618

1719
pub fn set_routes() -> Router<Arc<AppState>> {
18-
Router::new()
19-
.route("/find_by_id/:id", post(find_by_id))
20+
Router::new()
21+
.route("/find_by_id/:id", get(find_by_id))
2022
.route("/list", post(list))
2123
.route("/add", post(add))
2224
.route("/update_by_id/:id", post(update_by_id))
2325
.route("/delete/:id", get(del))
26+
.route("/stop_task/:id", get(stop_task))
2427
}
2528

2629
async fn find_by_id(
@@ -81,3 +84,29 @@ async fn del(state: State<Arc<AppState>>, Path(id): Path<i32>) -> Result<ResJson
8184

8285
bool_response!(res)
8386
}
87+
88+
pub async fn stop_task(
89+
state: State<Arc<AppState>>,
90+
Path(id): Path<String>,
91+
) -> Result<ResJson<bool>, AppError> {
92+
let log_task_id = Uuid::parse_str(id.as_str())?.simple();
93+
94+
let log_id = state.stop_log_task(log_task_id).await;
95+
if let Some(log_id) = log_id {
96+
if let Err(err) = CollectLogService::update_by_id(
97+
&state.conn,
98+
log_id,
99+
collect_log::Model {
100+
status: 5,
101+
running_log: "用户手动停止".to_string(),
102+
..Default::default()
103+
},
104+
)
105+
.await
106+
{
107+
error!("{}", err);
108+
}
109+
}
110+
111+
bool_response!(anyhow::Ok::<bool>(true))
112+
}

0 commit comments

Comments
 (0)