Skip to content

Commit 63af3fe

Browse files
committed
feat(cli): basic retry on failure
1 parent 15fb9a8 commit 63af3fe

File tree

10 files changed

+145
-943
lines changed

10 files changed

+145
-943
lines changed

examples/deploy/deploy.mjs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import * as path from "path";
1515
// import { tgDeploy } from "../../typegraph/node/sdk/dist/tg_deploy.js";
1616

1717

18-
const tg = typegraph({
18+
const tg = await typegraph({
1919
name: "deploy-example-node",
2020
disableAutoSerialization: true // disable print
2121
}, (g) => {
@@ -83,7 +83,7 @@ const auth = new BasicAuth("admin", "password");
8383

8484
tgDeploy(tg, {
8585
baseUrl,
86-
cliVersion: "0.3.4",
86+
cliVersion: "0.3.5-0",
8787
auth,
8888
secrets: {
8989
TG_DEPLOY_EXAMPLE_NODE_POSTGRES: "postgresql://postgres:password@localhost:5432/db?schema=e2e7894"

examples/deploy/deploy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def deploy_example_python(g: Graph):
8686
TypegraphDeployParams(
8787
base_url="http://localhost:7890",
8888
auth=auth,
89-
cli_version="0.3.4",
89+
cli_version="0.3.5-0",
9090
artifacts_config=artifacts_config,
9191
secrets={
9292
"TG_DEPLOY_EXAMPLE_PYTHON_POSTGRES": "postgresql://postgres:password@localhost:5432/db?schema=e2e7894"

meta-cli/src/cli/deploy.rs

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::deploy::actors::discovery::DiscoveryActor;
1414
use crate::deploy::actors::loader::{
1515
self, LoaderActor, LoaderEvent, ReloadModule, ReloadReason, StopBehavior,
1616
};
17-
use crate::deploy::actors::pusher::Push;
17+
use crate::deploy::actors::pusher::PushResult;
1818
use crate::deploy::actors::watcher::WatcherActor;
1919
use actix::prelude::*;
2020
use anyhow::{bail, Context, Result};
@@ -188,11 +188,7 @@ impl Action for DeploySubcommand {
188188

189189
mod default_mode {
190190
//! non-watch mode
191-
192-
use crate::deploy::actors::{
193-
loader::LoadModule,
194-
push_manager::{PushManager, PushManagerActor, PushManagerBuilder},
195-
};
191+
use default_mode::actors::loader::LoadModule;
196192

197193
use super::*;
198194

@@ -201,7 +197,6 @@ mod default_mode {
201197
console: Addr<ConsoleActor>,
202198
loader: Addr<LoaderActor>,
203199
loader_event_rx: mpsc::UnboundedReceiver<LoaderEvent>,
204-
pusher: Addr<PushManagerActor>,
205200
}
206201

207202
impl DefaultMode {
@@ -223,21 +218,18 @@ mod default_mode {
223218
.auto_stop()
224219
.start();
225220

226-
let pusher = PushManagerBuilder::new(console.clone()).start();
227-
228221
Ok(Self {
229222
deploy,
230223
console,
231224
loader,
232225
loader_event_rx,
233-
pusher,
234226
})
235227
}
236228

237229
pub async fn run(self) -> Result<()> {
238230
log::debug!("file: {:?}", self.deploy.file);
239231
let _discovery = if let Some(file) = self.deploy.file.clone() {
240-
self.loader.do_send(LoadModule(file));
232+
self.loader.do_send(LoadModule(file.to_path_buf().into()));
241233
None
242234
} else {
243235
Some(
@@ -252,36 +244,32 @@ mod default_mode {
252244
};
253245

254246
let loader = self.loader.clone();
255-
let pusher = self.pusher.clone();
256247
self.push_loaded_typegraphs();
257248

258-
let ret = match loader::stopped(loader).await {
249+
match loader::stopped(loader).await {
259250
Ok(StopBehavior::Restart) => unreachable!("LoaderActor should not restart"),
260251
Ok(StopBehavior::ExitSuccess) => Ok(()),
261252
Ok(StopBehavior::ExitFailure(msg)) => bail!("{msg}"),
262253
Err(e) => panic!("Loader actor stopped unexpectedly: {e:?}"),
263-
};
264-
265-
log::debug!("loader stopped, stopping pusher");
266-
let result = pusher.stop().await;
267-
if let Err(e) = result {
268-
System::current().stop_with_code(1);
269-
return Err(e);
270254
}
271-
ret
272255
}
273256

274257
fn push_loaded_typegraphs(self) {
275-
let pusher = self.pusher.clone();
276258
let mut event_rx = self.loader_event_rx;
277-
278259
Arbiter::current().spawn(async move {
279260
while let Some(event) = event_rx.recv().await {
280261
match event {
281-
LoaderEvent::Typegraph(tg) => {
282-
// TODO await -- no queue
283-
pusher.do_send(Push::new(tg.into()));
284-
}
262+
LoaderEvent::Typegraph(tg) => match tg.get_response_or_fail() {
263+
Ok(res) => {
264+
let push =
265+
PushResult::new(self.console.clone(), res.as_ref().clone())
266+
.unwrap();
267+
if let Err(e) = push.finalize() {
268+
panic!("{}", e.to_string());
269+
}
270+
}
271+
Err(e) => panic!("{}", e.to_string()),
272+
},
285273
LoaderEvent::Stopped(b) => {
286274
if let StopBehavior::ExitFailure(msg) = b {
287275
panic!("{msg}");
@@ -299,7 +287,7 @@ mod default_mode {
299287
mod watch_mode {
300288
use std::time::Duration;
301289

302-
use crate::deploy::actors::push_manager::{PushManager, PushManagerActor, PushManagerBuilder};
290+
use watch_mode::actors::loader::LoadModule;
303291

304292
use super::*;
305293

@@ -352,33 +340,25 @@ mod watch_mode {
352340
)?
353341
.start();
354342

355-
let pusher = PushManagerBuilder::new(console.clone())
356-
.linear_backoff(Duration::from_secs(5), 3)
357-
.start();
358-
359343
let actor_system = ActorSystem {
360344
console: console.clone(),
361345
watcher,
362346
loader: loader.clone(),
363-
pusher: pusher.clone(),
364347
};
365348

366-
actor_system.push_loaded_typegraphs(loader_event_rx);
349+
actor_system.handle_loaded_typegraphs(loader_event_rx);
367350
actor_system.handle_watch_events(watch_event_rx);
368351
actor_system.update_ctrlc_handler(ctrlc_handler_data.clone());
369352

370353
// TODO wait for push lifecycle
371354
match loader::stopped(loader).await {
372355
Ok(StopBehavior::ExitSuccess) => {
373-
pusher.stop().await?;
374356
break;
375357
}
376358
Ok(StopBehavior::Restart) => {
377-
pusher.stop().await?;
378359
continue;
379360
}
380361
Ok(StopBehavior::ExitFailure(_)) => {
381-
pusher.stop().await?;
382362
break;
383363
}
384364
Err(e) => {
@@ -394,19 +374,41 @@ mod watch_mode {
394374
console: Addr<ConsoleActor>,
395375
watcher: Addr<WatcherActor>,
396376
loader: Addr<LoaderActor>,
397-
pusher: Addr<PushManagerActor>,
398377
}
399378

400379
impl ActorSystem {
401-
fn push_loaded_typegraphs(&self, event_rx: mpsc::UnboundedReceiver<LoaderEvent>) {
402-
let pusher = self.pusher.clone();
380+
fn handle_loaded_typegraphs(&self, event_rx: mpsc::UnboundedReceiver<LoaderEvent>) {
381+
let console = self.console.clone();
382+
let loader = self.loader.clone();
403383
Arbiter::current().spawn(async move {
404384
let mut event_rx = event_rx;
405385
while let Some(event) = event_rx.recv().await {
406386
match event {
407387
LoaderEvent::Typegraph(tg) => {
408-
pusher.do_send(Push::new(tg.into()));
409-
// TODO update deps
388+
let response = ServerStore::get_response_or_fail(&tg.path)
389+
.unwrap()
390+
.as_ref()
391+
.to_owned();
392+
match PushResult::new(console.clone(), response) {
393+
Ok(push) => {
394+
if let Err(e) = push.finalize() {
395+
panic!("{}", e.to_string());
396+
}
397+
}
398+
Err(_) => {
399+
// very basic retry
400+
// TODO: implement something similar to the old 3 retries
401+
let wait_ms = 3000;
402+
console.warning(format!(
403+
"Retrying {:?} after {}",
404+
tg.path.display(),
405+
wait_ms,
406+
));
407+
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
408+
409+
loader.do_send(LoadModule(Arc::new(tg.path)));
410+
}
411+
}
410412
}
411413
LoaderEvent::Stopped(b) => {
412414
if let StopBehavior::ExitFailure(msg) = b {
@@ -427,7 +429,6 @@ mod watch_mode {
427429
let console = self.console.clone();
428430
let watcher = self.watcher.clone();
429431
let loader = self.loader.clone();
430-
let pusher = self.pusher.clone();
431432
Arbiter::current().spawn(async move {
432433
let mut watch_event_rx = watch_event_rx;
433434
while let Some(event) = watch_event_rx.recv().await {
@@ -441,15 +442,12 @@ mod watch_mode {
441442
watcher.do_send(actors::watcher::Stop);
442443
}
443444
E::TypegraphModuleChanged { typegraph_module } => {
444-
pusher.cancel_all_from(&typegraph_module).await.unwrap();
445445
loader.do_send(ReloadModule(
446446
typegraph_module.into(),
447447
ReloadReason::FileChanged,
448448
));
449449
}
450450
E::TypegraphModuleDeleted { typegraph_module } => {
451-
// TODO registry
452-
pusher.cancel_all_from(&typegraph_module).await.unwrap();
453451
// TODO internally by the watcher??
454452
watcher.do_send(actors::watcher::RemoveTypegraph(
455453
typegraph_module.clone(),
@@ -460,7 +458,6 @@ mod watch_mode {
460458
typegraph_module,
461459
dependency_path,
462460
} => {
463-
pusher.cancel_all_from(&typegraph_module).await.unwrap();
464461
loader.do_send(ReloadModule(
465462
typegraph_module.into(),
466463
ReloadReason::DependencyChanged(dependency_path),

meta-cli/src/codegen/deno.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,11 @@ mod tests {
497497
.auto_stop()
498498
.start();
499499
loader.do_send(LoadModule(
500-
test_folder.join(&typegraph_test).as_path().into(),
500+
test_folder
501+
.join(&typegraph_test)
502+
.as_path()
503+
.to_owned()
504+
.into(),
501505
));
502506

503507
let mut event_rx = event_rx;

meta-cli/src/deploy/actors/loader.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
22
// SPDX-License-Identifier: MPL-2.0
33

4-
use std::path::{Path, PathBuf};
4+
use std::path::PathBuf;
55
use std::sync::atomic::{AtomicU32, Ordering};
66
use std::sync::Arc;
77

@@ -92,18 +92,20 @@ impl LoaderActor {
9292

9393
impl LoaderActor {
9494
fn loader_pool(config: Arc<Config>, max_parallel_loads: usize) -> LoaderPool {
95-
LoaderPool::new(config, max_parallel_loads)
95+
LoaderPool::new(config.base_dir.clone(), max_parallel_loads)
9696
}
9797

98-
fn load_module(&self, self_addr: Addr<Self>, path: Arc<Path>) {
98+
fn load_module(&self, self_addr: Addr<Self>, path: Arc<PathBuf>) {
9999
let loader_pool = self.loader_pool.clone();
100100
let console = self.console.clone();
101101
let counter = self.counter.clone();
102102
Arbiter::current().spawn(async move {
103103
// TODO error handling?
104104
let loader = loader_pool.get_loader().await.unwrap();
105105
match loader.load_module(path.clone()).await {
106-
Ok(tgs_infos) => self_addr.do_send(LoadedModule(path, tgs_infos)),
106+
Ok(tgs_infos) => {
107+
self_addr.do_send(LoadedModule(path.as_ref().to_owned().into(), tgs_infos))
108+
}
107109
Err(e) => {
108110
if counter.is_some() {
109111
// auto stop
@@ -125,11 +127,11 @@ pub enum ReloadReason {
125127

126128
#[derive(Message)]
127129
#[rtype(result = "()")]
128-
pub struct LoadModule(pub Arc<Path>);
130+
pub struct LoadModule(pub Arc<PathBuf>);
129131

130132
#[derive(Message)]
131133
#[rtype(result = "()")]
132-
pub struct ReloadModule(pub Arc<Path>, pub ReloadReason);
134+
pub struct ReloadModule(pub Arc<PathBuf>, pub ReloadReason);
133135

134136
#[derive(Message)]
135137
#[rtype(result = "()")]
@@ -141,7 +143,7 @@ struct SetStoppedTx(oneshot::Sender<StopBehavior>);
141143

142144
#[derive(Message)]
143145
#[rtype(result = "()")]
144-
struct LoadedModule(pub Arc<Path>, TypegraphInfos);
146+
struct LoadedModule(pub Arc<PathBuf>, TypegraphInfos);
145147

146148
impl Actor for LoaderActor {
147149
type Context = Context<Self>;

meta-cli/src/deploy/actors/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,5 @@
44
pub mod console;
55
pub mod discovery;
66
pub mod loader;
7-
pub mod push_manager;
87
pub mod pusher;
98
pub mod watcher;

0 commit comments

Comments
 (0)