Skip to content

Commit 86f43eb

Browse files
author
Greg Soltis
authored
chore(Turborepo): Consolidate Subscriber::watch and setup task (#7714)
### Description - Remove the async task in Subscriber::new, move functionality to an init_watch function - Cleanup based on no longer needing the one-off async task, including removing unnecessary Arcs ### Testing Instructions Existing test suite updated with new arguments Closes TURBO-2626 Co-authored-by: Greg Soltis <Greg Soltis>
1 parent 0a3bf27 commit 86f43eb

File tree

1 file changed

+100
-143
lines changed

1 file changed

+100
-143
lines changed

crates/turborepo-filewatch/src/package_watcher.rs

+100-143
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{collections::HashMap, sync::Arc};
55

66
use futures::FutureExt;
77
use notify::Event;
8+
use thiserror::Error;
89
use tokio::{
910
join,
1011
sync::{
@@ -90,6 +91,16 @@ impl PackageDiscovery for WatchingPackageDiscovery {
9091
}
9192
}
9293

94+
#[derive(Debug, Error)]
95+
enum PackageWatcherError {
96+
#[error("failed to discover packages {0}")]
97+
Discovery(#[from] discovery::Error),
98+
#[error("failed to resolve package manager {0}")]
99+
PackageManager(#[from] package_manager::Error),
100+
#[error("filewatching shut down, package manager not available")]
101+
Filewatching(watch::error::RecvError),
102+
}
103+
93104
/// Watches the filesystem for changes to packages and package managers.
94105
pub struct PackageWatcher {
95106
// _exit_ch exists to trigger a close on the receiver when an instance
@@ -117,10 +128,10 @@ impl PackageWatcher {
117128
cookie_writer: CookieWriter,
118129
) -> Result<Self, package_manager::Error> {
119130
let (exit_tx, exit_rx) = oneshot::channel();
120-
let subscriber = Subscriber::new(root, recv, backup_discovery, cookie_writer)?;
131+
let subscriber = Subscriber::new(root, backup_discovery, cookie_writer)?;
121132
let package_manager_lazy = subscriber.manager_receiver();
122133
let package_data = subscriber.package_data();
123-
let handle = tokio::spawn(subscriber.watch(exit_rx));
134+
let handle = tokio::spawn(subscriber.watch(exit_rx, recv));
124135
Ok(Self {
125136
_exit_tx: exit_tx,
126137
_handle: handle,
@@ -179,23 +190,15 @@ impl PackageWatcher {
179190
/// The underlying task that listens to file system events and updates the
180191
/// internal package state.
181192
struct Subscriber<T: PackageDiscovery> {
182-
/// we need to hold on to this. dropping it will close the downstream
183-
/// data dependencies
184-
#[allow(clippy::type_complexity)]
185-
#[allow(dead_code)]
186-
file_event_receiver_tx:
187-
Arc<watch::Sender<Option<broadcast::Receiver<Result<Event, NotifyError>>>>>,
188-
189-
file_event_receiver_lazy: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
190-
backup_discovery: Arc<T>,
193+
backup_discovery: T,
191194

192195
repo_root: AbsoluteSystemPathBuf,
193196
root_package_json_path: AbsoluteSystemPathBuf,
194197

195198
// package manager data
196-
package_manager_tx: Arc<watch::Sender<Option<PackageManagerState>>>,
199+
package_manager_tx: watch::Sender<Option<PackageManagerState>>,
197200
package_manager_lazy: CookiedOptionalWatch<PackageManagerState, ()>,
198-
package_data_tx: Arc<watch::Sender<Option<HashMap<AbsoluteSystemPathBuf, WorkspaceData>>>>,
201+
package_data_tx: watch::Sender<Option<HashMap<AbsoluteSystemPathBuf, WorkspaceData>>>,
199202
package_data_lazy: CookiedOptionalWatch<HashMap<AbsoluteSystemPathBuf, WorkspaceData>, ()>,
200203
cookie_tx: CookieRegister,
201204
}
@@ -217,109 +220,18 @@ impl<T: PackageDiscovery + Send + Sync + 'static> Subscriber<T> {
217220
/// data up to date.
218221
fn new(
219222
repo_root: AbsoluteSystemPathBuf,
220-
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
221223
backup_discovery: T,
222224
writer: CookieWriter,
223225
) -> Result<Self, Error> {
224226
let (package_data_tx, cookie_tx, package_data_lazy) = CookiedOptionalWatch::new(writer);
225-
let package_data_tx = Arc::new(package_data_tx);
226227
let (package_manager_tx, package_manager_lazy) = package_data_lazy.new_sibling();
227-
let package_manager_tx = Arc::new(package_manager_tx);
228228

229229
// we create a second optional watch here so that we can ensure it is ready and
230230
// pass it down stream after the initial discovery, otherwise our package
231231
// discovery watcher will consume events before we have our initial state
232-
let (file_event_receiver_tx, file_event_receiver_lazy) = OptionalWatch::new();
233-
let file_event_receiver_tx = Arc::new(file_event_receiver_tx);
234-
235-
let backup_discovery = Arc::new(backup_discovery);
236-
237232
let package_json_path = repo_root.join_component("package.json");
238233

239-
let _task = tokio::spawn({
240-
let package_data_tx = package_data_tx.clone();
241-
let manager_tx = package_manager_tx.clone();
242-
let backup_discovery = backup_discovery.clone();
243-
let repo_root = repo_root.clone();
244-
let package_json_path = package_json_path.clone();
245-
let recv_tx = file_event_receiver_tx.clone();
246-
async move {
247-
// wait for the watcher, so we can process events that happen during discovery
248-
let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else {
249-
// if we get here, it means that file watching has not started, so we should
250-
// just report that the package watcher is not available
251-
tracing::debug!("file watching shut down, package watcher not available");
252-
return;
253-
};
254-
255-
let initial_discovery = backup_discovery.discover_packages().await;
256-
257-
let Ok(initial_discovery) = initial_discovery else {
258-
// if initial discovery fails, there is nothing we can do. we should just report
259-
// that the package watcher is not available
260-
//
261-
// NOTE: in the future, if we decide to differentiate between 'not ready' and
262-
// unavailable, we MUST update the status here to unavailable or the client will
263-
// hang
264-
return;
265-
};
266-
267-
let Ok((workspace_config_path, filter)) = Self::update_package_manager(
268-
&initial_discovery.package_manager,
269-
&repo_root,
270-
&package_json_path,
271-
) else {
272-
// similar story here, if the package manager cannot be read, we should just
273-
// report that the package watcher is not available
274-
return;
275-
};
276-
277-
// now that the two pieces of data are available, we can send the package
278-
// manager and set the packages
279-
280-
let state = PackageManagerState {
281-
manager: initial_discovery.package_manager,
282-
filter: Arc::new(filter),
283-
workspace_config_path,
284-
};
285-
286-
// if either of these fail, it means that there are no more subscribers and we
287-
// should just ignore it, since we are likely closing
288-
let manager_listeners = if manager_tx.send(Some(state)).is_err() {
289-
tracing::debug!("no listeners for package manager");
290-
false
291-
} else {
292-
true
293-
};
294-
295-
let package_data_listeners = if package_data_tx
296-
.send(Some(
297-
initial_discovery
298-
.workspaces
299-
.into_iter()
300-
.map(|p| (p.package_json.parent().expect("non-root").to_owned(), p))
301-
.collect::<HashMap<_, _>>(),
302-
))
303-
.is_err()
304-
{
305-
tracing::debug!("no listeners for package data");
306-
false
307-
} else {
308-
true
309-
};
310-
311-
// if we have no listeners for either, we should just exit
312-
if manager_listeners || package_data_listeners {
313-
_ = recv_tx.send(Some(recv));
314-
} else {
315-
tracing::debug!("no listeners for file events, exiting");
316-
}
317-
}
318-
});
319-
320234
Ok(Self {
321-
file_event_receiver_tx,
322-
file_event_receiver_lazy,
323235
backup_discovery,
324236
repo_root,
325237
root_package_json_path: package_json_path,
@@ -331,45 +243,90 @@ impl<T: PackageDiscovery + Send + Sync + 'static> Subscriber<T> {
331243
})
332244
}
333245

334-
async fn watch(mut self, exit_rx: oneshot::Receiver<()>) {
335-
let process = async move {
336-
tracing::debug!("starting package watcher");
337-
let Ok(mut recv) = self
338-
.file_event_receiver_lazy
339-
.get()
340-
.await
341-
.map(|r| r.resubscribe())
342-
else {
343-
// if the channel is closed, we should just exit
344-
tracing::debug!("file watcher shut down, exiting");
345-
return;
346-
};
347-
348-
tracing::debug!("package watcher ready");
349-
loop {
350-
let file_event = recv.recv().await;
351-
match file_event {
352-
Ok(Ok(event)) => match self.handle_file_event(&event).await {
353-
Ok(()) => {}
354-
Err(()) => {
355-
tracing::debug!("package watching is closing, exiting");
356-
return;
357-
}
358-
},
359-
// if we get an error, we need to re-discover the packages
360-
Ok(Err(_)) => self.rediscover_packages().await,
361-
Err(RecvError::Closed) => return,
362-
// if we end up lagging, warn and rediscover packages
363-
Err(RecvError::Lagged(count)) => {
364-
tracing::warn!("lagged behind {count} processing file watching events");
365-
self.rediscover_packages().await;
246+
async fn init_watch(
247+
&mut self,
248+
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
249+
) -> Result<broadcast::Receiver<Result<Event, NotifyError>>, PackageWatcherError> {
250+
// wait for the watcher, so we can process events that happen during discovery
251+
let recv = recv
252+
.get()
253+
.await
254+
.map(|r| r.resubscribe())
255+
.map_err(PackageWatcherError::Filewatching)?;
256+
257+
// if initial discovery fails, there is nothing we can do. we should just report
258+
// that the package watcher is not available
259+
//
260+
// NOTE: in the future, if we decide to differentiate between 'not ready' and
261+
// unavailable, we MUST update the status here to unavailable or the client will
262+
// hang
263+
let initial_discovery = self.backup_discovery.discover_packages().await?;
264+
265+
let (workspace_config_path, filter) = Self::update_package_manager(
266+
&initial_discovery.package_manager,
267+
&self.repo_root,
268+
&self.root_package_json_path,
269+
)?;
270+
271+
// now that the two pieces of data are available, we can send the package
272+
// manager and set the packages
273+
274+
let state = PackageManagerState {
275+
manager: initial_discovery.package_manager,
276+
filter: Arc::new(filter),
277+
workspace_config_path,
278+
};
279+
280+
// if either of these fail, it means that there are no more subscribers and we
281+
// should just ignore it, since we are likely closing
282+
let _ = self.package_manager_tx.send(Some(state));
283+
let _ = self.package_data_tx.send(Some(
284+
initial_discovery
285+
.workspaces
286+
.into_iter()
287+
.map(|p| (p.package_json.parent().expect("non-root").to_owned(), p))
288+
.collect::<HashMap<_, _>>(),
289+
));
290+
291+
Ok(recv)
292+
}
293+
294+
async fn watch_process(
295+
mut self,
296+
recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
297+
) -> Result<(), PackageWatcherError> {
298+
tracing::debug!("starting package watcher");
299+
let mut recv = self.init_watch(recv).await.unwrap();
300+
301+
tracing::debug!("package watcher ready");
302+
loop {
303+
let file_event = recv.recv().await;
304+
match file_event {
305+
Ok(Ok(event)) => match self.handle_file_event(&event).await {
306+
Ok(()) => {}
307+
Err(()) => {
308+
tracing::debug!("package watching is closing, exiting");
309+
return Ok(());
366310
}
311+
},
312+
// if we get an error, we need to re-discover the packages
313+
Ok(Err(_)) => self.rediscover_packages().await,
314+
Err(RecvError::Closed) => return Ok(()),
315+
// if we end up lagging, warn and rediscover packages
316+
Err(RecvError::Lagged(count)) => {
317+
tracing::warn!("lagged behind {count} processing file watching events");
318+
self.rediscover_packages().await;
367319
}
368320
}
369-
};
370-
371-
// respond to changes
321+
}
322+
}
372323

324+
async fn watch(
325+
self,
326+
exit_rx: oneshot::Receiver<()>,
327+
recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
328+
) {
329+
let process = tokio::spawn(self.watch_process(recv));
373330
tokio::select! {
374331
biased;
375332
_ = exit_rx => {
@@ -773,11 +730,11 @@ mod test {
773730
CookieWriter::new_with_default_cookie_dir(&root, Duration::from_secs(2), rx.clone());
774731

775732
let subscriber =
776-
Subscriber::new(root.clone(), rx, mock_discovery, cookie_writer.clone()).unwrap();
733+
Subscriber::new(root.clone(), mock_discovery, cookie_writer.clone()).unwrap();
777734

778735
let mut package_data = subscriber.package_data();
779736

780-
let _handle = tokio::spawn(subscriber.watch(exit_rx));
737+
let _handle = tokio::spawn(subscriber.watch(exit_rx, rx));
781738

782739
tx.send(Ok(notify::Event {
783740
kind: notify::EventKind::Create(notify::event::CreateKind::File),
@@ -978,11 +935,11 @@ mod test {
978935
let cookie_writer =
979936
CookieWriter::new_with_default_cookie_dir(&root, Duration::from_secs(2), rx.clone());
980937
let subscriber =
981-
Subscriber::new(root.clone(), rx, mock_discovery, cookie_writer.clone()).unwrap();
938+
Subscriber::new(root.clone(), mock_discovery, cookie_writer.clone()).unwrap();
982939

983940
let mut package_data = subscriber.package_data();
984941

985-
let _handle = tokio::spawn(subscriber.watch(exit_rx));
942+
let _handle = tokio::spawn(subscriber.watch(exit_rx, rx));
986943

987944
let (data, _) = join! {
988945
package_data.get(),

0 commit comments

Comments
 (0)