|
| 1 | +use std::path::{Path, PathBuf}; |
| 2 | +use std::time::Duration; |
| 3 | + |
| 4 | +use futures::prelude::*; |
| 5 | +use notify::Config; |
| 6 | +use notify::EventKind; |
| 7 | +use notify::PollWatcher; |
| 8 | +use notify::RecursiveMode; |
| 9 | +use notify::Watcher; |
| 10 | +use notify::event::DataChange; |
| 11 | +use notify::event::MetadataKind; |
| 12 | +use notify::event::ModifyKind; |
| 13 | +use tokio::sync::mpsc; |
| 14 | +use tokio::sync::mpsc::error::TrySendError; |
| 15 | + |
| 16 | +#[cfg(not(test))] |
| 17 | +const DEFAULT_WATCH_DURATION: Duration = Duration::from_secs(3); |
| 18 | + |
| 19 | +#[cfg(test)] |
| 20 | +const DEFAULT_WATCH_DURATION: Duration = Duration::from_millis(100); |
| 21 | + |
| 22 | +/// Creates a stream events whenever the file at the path has changes. The stream never terminates |
| 23 | +/// and must be dropped to finish watching. |
| 24 | +/// |
| 25 | +/// # Arguments |
| 26 | +/// |
| 27 | +/// * `path`: The file to watch |
| 28 | +/// |
| 29 | +/// returns: impl Stream<Item=()> |
| 30 | +/// |
| 31 | +pub(crate) fn watch(path: &Path) -> impl Stream<Item = ()> + use<> { |
| 32 | + watch_with_duration(path, DEFAULT_WATCH_DURATION) |
| 33 | +} |
| 34 | + |
| 35 | +#[allow(clippy::panic)] // TODO: code copied from router contained existing panics |
| 36 | +fn watch_with_duration(path: &Path, duration: Duration) -> impl Stream<Item = ()> + use<> { |
| 37 | + // Due to the vagaries of file watching across multiple platforms, instead of watching the |
| 38 | + // supplied path (file), we are going to watch the parent (directory) of the path. |
| 39 | + let config_file_path = PathBuf::from(path); |
| 40 | + let watched_path = config_file_path.clone(); |
| 41 | + |
| 42 | + let (watch_sender, watch_receiver) = mpsc::channel(1); |
| 43 | + let watch_receiver_stream = tokio_stream::wrappers::ReceiverStream::new(watch_receiver); |
| 44 | + // We can't use the recommended watcher, because there's just too much variation across |
| 45 | + // platforms and file systems. We use the Poll Watcher, which is implemented consistently |
| 46 | + // across all platforms. Less reactive than other mechanisms, but at least it's predictable |
| 47 | + // across all environments. We compare contents as well, which reduces false positives with |
| 48 | + // some additional processing burden. |
| 49 | + let config = Config::default() |
| 50 | + .with_poll_interval(duration) |
| 51 | + .with_compare_contents(true); |
| 52 | + let mut watcher = PollWatcher::new( |
| 53 | + move |res: Result<notify::Event, notify::Error>| match res { |
| 54 | + Ok(event) => { |
| 55 | + // The two kinds of events of interest to use are writes to the metadata of a |
| 56 | + // watched file and changes to the data of a watched file |
| 57 | + if matches!( |
| 58 | + event.kind, |
| 59 | + EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)) |
| 60 | + | EventKind::Modify(ModifyKind::Data(DataChange::Any)) |
| 61 | + ) && event.paths.contains(&watched_path) |
| 62 | + { |
| 63 | + loop { |
| 64 | + match watch_sender.try_send(()) { |
| 65 | + Ok(_) => break, |
| 66 | + Err(err) => { |
| 67 | + tracing::warn!( |
| 68 | + "could not process file watch notification. {}", |
| 69 | + err.to_string() |
| 70 | + ); |
| 71 | + if matches!(err, TrySendError::Full(_)) { |
| 72 | + std::thread::sleep(Duration::from_millis(50)); |
| 73 | + } else { |
| 74 | + panic!("event channel failed: {err}"); |
| 75 | + } |
| 76 | + } |
| 77 | + } |
| 78 | + } |
| 79 | + } |
| 80 | + } |
| 81 | + Err(e) => tracing::error!("event error: {:?}", e), |
| 82 | + }, |
| 83 | + config, |
| 84 | + ) |
| 85 | + .unwrap_or_else(|_| panic!("could not create watch on: {config_file_path:?}")); |
| 86 | + watcher |
| 87 | + .watch(&config_file_path, RecursiveMode::NonRecursive) |
| 88 | + .unwrap_or_else(|_| panic!("could not watch: {config_file_path:?}")); |
| 89 | + // Tell watchers once they should read the file once, |
| 90 | + // then listen to fs events. |
| 91 | + stream::once(future::ready(())) |
| 92 | + .chain(watch_receiver_stream) |
| 93 | + .chain(stream::once(async move { |
| 94 | + // This exists to give the stream ownership of the hotwatcher. |
| 95 | + // Without it hotwatch will get dropped and the stream will terminate. |
| 96 | + // This code never actually gets run. |
| 97 | + // The ideal would be that hotwatch implements a stream and |
| 98 | + // therefore we don't need this hackery. |
| 99 | + drop(watcher); |
| 100 | + })) |
| 101 | + .boxed() |
| 102 | +} |
0 commit comments