Skip to content

Commit e5b4711

Browse files
authored
Wait for jobs rework (#140)
1 parent b0dcc18 commit e5b4711

9 files changed

+79
-50
lines changed

R/clusterFunctions.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ cfReadBrewTemplate = function(template, comment.string = NA_character_) {
153153
"!DEBUG [cfReadBrewTemplate]: Parsing template from string"
154154
lines = stri_trim_both(stri_split_lines(template)[[1L]])
155155
} else {
156-
"!DEBUG [cfReadBrewTemplate]: Parsing template form file '`template`'"
156+
"!DEBUG [cfReadBrewTemplate]: Parsing template file '`template`'"
157157
lines = stri_trim_both(readLines(template))
158158
}
159159

R/filenames.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ getLogFiles = function(reg, ids) {
3030
}
3131

3232
getJobFiles = function(reg, hash) {
33-
fp(dir(reg, "jobs"), sprintf("%s.rds", hash))
33+
fp(reg$file.dir, "jobs", sprintf("%s.rds", hash))
3434
}
3535

3636
getExternalDirs = function(reg, ids) {

R/findJobs.R

+7
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,13 @@ findErrors = function(ids = NULL, reg = getDefaultRegistry()) {
220220
}
221221

222222

223+
# used in waitForJobs: find jobs which are done or error
224+
.findTerminated = function(reg, ids = NULL) {
225+
done = NULL
226+
filter(reg$status, ids, c("job.id", "done"))[!is.na(done), "job.id"]
227+
}
228+
229+
223230
#' @export
224231
#' @rdname findJobs
225232
findOnSystem = function(ids = NULL, reg = getDefaultRegistry()) {

R/runOSCommand.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ runOSCommand = function(sys.cmd, sys.args = character(0L), stdin = "", nodename
2929
assertString(nodename, min.chars = 1L)
3030

3131
if (!isLocalHost(nodename)) {
32-
sys.args = c("-q", nodename, shQuote(stri_flatten(c(sys.cmd, sys.args), " ")))
32+
sys.args = c("-q", nodename, sys.cmd, sys.args)
3333
sys.cmd = "ssh"
3434
}
3535

R/waitForJobs.R

+43-44
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
#' \code{FALSE}. This argument may be required on some systems where, e.g.,
1818
#' expired jobs or jobs on hold are problematic to detect. If you don't want
1919
#' a timeout, set this to \code{Inf}. Default is \code{604800} (one week).
20+
#' @param expire.after [\code{integer(1)}]\cr
21+
#' Jobs count as \dQuote{expired} if they are not found on the system but have not communicated back
22+
#' their results (or error message). This frequently happens on managed system if the scheduler kills
23+
#' a job because the job has hit the walltime or request more memory than reserved.
24+
#' On the other hand, network file systems often require several seconds for new files to be found,
25+
#' which can lead to false positives in the detection heuristic.
26+
#' \code{waitForJobs} treats such jobs as expired after they have not been detected on the system
27+
#' for \code{expire.after} iterations (default 3 iterations).
2028
#' @param stop.on.error [\code{logical(1)}]\cr
2129
#' Immediately cancel if a job terminates with an error? Default is
2230
#' \code{FALSE}.
@@ -25,84 +33,75 @@
2533
#' successfully and \code{FALSE} if either the timeout is reached or at least
2634
#' one job terminated with an exception.
2735
#' @export
28-
waitForJobs = function(ids = NULL, sleep = NULL, timeout = 604800, stop.on.error = FALSE, reg = getDefaultRegistry()) {
36+
waitForJobs = function(ids = NULL, sleep = NULL, timeout = 604800, expire.after = 3L, stop.on.error = FALSE, reg = getDefaultRegistry()) {
2937
assertRegistry(reg, writeable = FALSE, sync = TRUE)
3038
assertNumber(timeout, lower = 0)
39+
assertCount(expire.after, positive = TRUE)
3140
assertFlag(stop.on.error)
3241
sleep = getSleepFunction(reg, sleep)
3342
ids = convertIds(reg, ids, default = .findSubmitted(reg = reg))
3443

35-
.findNotTerminated = function(reg, ids = NULL) {
36-
done = NULL
37-
filter(reg$status, ids, c("job.id", "done"))[is.na(done), "job.id"]
38-
}
39-
4044
if (nrow(.findNotSubmitted(ids = ids, reg = reg)) > 0L) {
4145
warning("Cannot wait for unsubmitted jobs. Removing from ids.")
4246
ids = ids[.findSubmitted(ids = ids, reg = reg), nomatch = 0L]
4347
}
4448

45-
n.jobs = nrow(ids)
46-
if (n.jobs == 0L)
49+
if (nrow(ids) == 0L) {
4750
return(TRUE)
51+
}
4852

49-
batch.ids = getBatchIds(reg)
50-
"!DEBUG [waitForJobs]: Using `nrow(ids)` ids and `nrow(batch.ids)` initial batch ids"
53+
terminated = on.sys = expire.counter = NULL
54+
ids$terminated = FALSE
55+
ids$on.sys = FALSE
56+
ids$expire.counter = 0L
5157

5258
timeout = Sys.time() + timeout
53-
ids.disappeared = noIds()
54-
pb = makeProgressBar(total = n.jobs, format = "Waiting (S::system R::running D::done E::error) [:bar] :percent eta: :eta")
55-
i = 1L
59+
pb = makeProgressBar(total = nrow(ids), format = "Waiting (S::system R::running D::done E::error) [:bar] :percent eta: :eta")
60+
i = 0L
5661

5762
repeat {
58-
# case 1: all jobs terminated -> nothing on system
59-
ids.nt = .findNotTerminated(reg, ids)
60-
if (nrow(ids.nt) == 0L) {
63+
### case 1: all jobs terminated -> nothing on system
64+
ids[.findTerminated(reg, ids), "terminated" := TRUE]
65+
if (ids[!(terminated), .N] == 0L) {
6166
"!DEBUG [waitForJobs]: All jobs terminated"
6267
pb$update(1)
6368
waitForResults(reg, ids)
6469
return(nrow(.findErrors(reg, ids)) == 0L)
6570
}
6671

67-
stats = getStatusTable(ids = ids, batch.ids = batch.ids, reg = reg)
68-
pb$update((n.jobs - nrow(ids.nt)) / n.jobs, tokens = as.list(stats))
69-
70-
# case 2: there are errors and stop.on.error is TRUE
72+
### case 2: there are errors and stop.on.error is TRUE
7173
if (stop.on.error && nrow(.findErrors(reg, ids)) > 0L) {
7274
"!DEBUG [waitForJobs]: Errors found and stop.on.error is TRUE"
7375
pb$update(1)
7476
return(FALSE)
7577
}
7678

77-
# case 3: we have reached a timeout
78-
if (Sys.time() > timeout) {
79+
batch.ids = getBatchIds(reg)
80+
ids[, "on.sys" := FALSE][.findOnSystem(reg, ids, batch.ids = batch.ids), "on.sys" := TRUE]
81+
ids[!(on.sys) & !(terminated), "expire.counter" := expire.counter + 1L]
82+
stats = getStatusTable(ids = ids, batch.ids = batch.ids, reg = reg)
83+
pb$update(mean(ids$terminated), tokens = as.list(stats))
84+
"!DEBUG [waitForJobs]: batch.ids: `stri_flatten(batch.ids$batch.id, ',')`"
85+
86+
### case 3: jobs disappeared, we cannot find them on the system in [expire.after] iterations
87+
if (ids[!(terminated) & expire.counter > expire.after, .N] > 0L) {
88+
warning("Some jobs disappeared from the system")
7989
pb$update(1)
80-
warning("Timeout reached")
90+
waitForResults(reg, ids)
8191
return(FALSE)
8292
}
8393

84-
# case 4: jobs disappeared, we cannot find them on the system
85-
# heuristic:
86-
# job is not terminated, not on system and has not been on the system
87-
# in the previous iteration
88-
ids.on.sys = .findOnSystem(reg, ids, batch.ids = batch.ids)
89-
if (nrow(ids.disappeared) > 0L) {
90-
if (nrow(ids.nt[!ids.on.sys, on = "job.id"][ids.disappeared, on = "job.id", nomatch = 0L]) > 0L) {
91-
warning("Some jobs disappeared from the system")
92-
pb$update(1)
93-
waitForResults(reg, ids)
94-
return(FALSE)
95-
}
96-
}
97-
98-
ids.disappeared = ids[!ids.on.sys, on = "job.id"]
99-
"!DEBUG [waitForJobs]: `nrow(ids.disappeared)` jobs disappeared"
100-
94+
# case 4: we reach a timeout
10195
sleep(i)
10296
i = i + 1L
103-
suppressMessages(sync(reg = reg))
104-
saveRegistry(reg)
105-
batch.ids = getBatchIds(reg)
106-
"!DEBUG [waitForJobs]: New batch.ids: `stri_flatten(batch.ids$batch.id, ',')`"
97+
if (Sys.time() > timeout) {
98+
pb$update(1)
99+
warning("Timeout reached")
100+
return(FALSE)
101+
}
102+
103+
if (suppressMessages(sync(reg = reg)))
104+
saveRegistry(reg)
107105
}
108106
}
107+

man/waitForJobs.Rd

+10-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/testthat/test_ClusterFunctionsMulticore.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ test_that("cf multicore", {
99
silent({
1010
submitJobs(1:2, reg = reg)
1111
expect_equal(findOnSystem(reg = reg), findJobs(reg = reg))
12-
expect_true(waitForJobs(sleep = 0.2, reg = reg))
12+
expect_true(waitForJobs(sleep = 0.2, expire.after = 1, reg = reg))
1313
})
1414
expect_data_table(findOnSystem(reg = reg), nrow = 0)
1515
expect_equal(findDone(reg = reg), findJobs(reg = reg))

tests/testthat/test_JobCollection.R

+14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ test_that("makeJobCollection", {
2323
expect_output(print(jc), "Collection")
2424
})
2525

26+
27+
test_that("makeJobCollection does not expand relative paths", {
28+
skip_on_os("windows")
29+
reg = makeRegistry(file.dir = NA, make.default = FALSE)
30+
batchMap(identity, 1, reg = reg)
31+
reg$file.dir = npath("~/foo", must.work = FALSE)
32+
reg$work.dir = npath("~/bar", must.work = FALSE)
33+
jc = makeJobCollection(1, reg = reg)
34+
expect_true(stri_startswith_fixed(jc$file.dir, "~/foo"))
35+
expect_true(stri_startswith_fixed(jc$uri, "~/foo/jobs/"))
36+
expect_true(stri_startswith_fixed(jc$log.file, "~/foo/logs"))
37+
expect_true(stri_startswith_fixed(jc$work.dir, "~/bar"))
38+
})
39+
2640
test_that("makeJobCollection.ExperimentCollection", {
2741
reg = makeExperimentRegistry(file.dir = NA, make.default = FALSE)
2842
addProblem(reg = reg, "p1", fun = function(job, data, ...) list(data = data, ...))

tests/testthat/test_waitForJobs.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ test_that("waitForJobs", {
77
silent({
88
submitJobs(ids, reg = reg)
99
expect_true(waitForJobs(ids = ids[1], reg = reg, sleep = 1))
10-
expect_false(waitForJobs(ids = ids, stop.on.error = TRUE, sleep = 1, reg = reg))
10+
expect_false(waitForJobs(ids = ids, stop.on.error = TRUE, sleep = 1, expire.after = 1, reg = reg))
1111
})
1212
})
1313

0 commit comments

Comments
 (0)