Skip to content
Snippets Groups Projects
Commit 57e284ef authored by Florian Angerer's avatar Florian Angerer
Browse files

Do cleanup of lock files if expired.

parent 85abda31
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,9 @@ lock.file.name <- ".lock"
pkg.cache.max.retries <- 3L
# expiration time of a lock in seconds (default: 1 hour)
pkg.cache.lock.expiration <- 3600
pkg.cache.lock <- function(pkg.cache.env, version.dir) {
if (!as.logical(pkg.cache.env$sync)) {
return (TRUE)
......@@ -32,16 +35,68 @@ pkg.cache.lock <- function(pkg.cache.env, version.dir) {
tries <- 0L
version.lock.file <- file.path(version.dir, lock.file.name)
log.message("try to lock: ", version.lock.file, level=1)
pkg.cache.lock.cleanup.expired(version.lock.file)
while (file.exists(version.lock.file)) {
Sys.sleep(1)
tries <- tries + 1L
if (tries >= pkg.cache.max.retries) {
log.message("cannot lock ", version.dir, " because it is already locked")
return (FALSE)
}
pkg.cache.lock.cleanup.expired(version.lock.file)
}
log.message("locking: ", version.lock.file, level=1)
tryCatch(file.create(version.lock.file), error=function(e) return (FALSE), warning=function(e) return (FALSE))
tryCatch({
# creates a file with permissions "-r--r--r--"
umask.bak <- Sys.umask("333")
# this will fail if the file already exists (due to permissions)
cat(paste0(as.double(Sys.time() + pkg.cache.lock.expiration), "\n"), file=version.lock.file)
Sys.umask(umask.bak)
return (TRUE)
}, error = function(e) {
log.message("error when creating lock file ", version.lock.file, ": ", e$message)
Sys.umask(umask.bak)
return (FALSE)
}, warning = function(e) {
log.message("warning when creating lock file ", version.lock.file, ": ", e$message, level=1)
Sys.umask(umask.bak)
})
TRUE
}
pkg.cache.lock.cleanup.expired <- function(lock.file) {
# see if lock is expired
log.message("check expiration of lock file: ", lock.file, level=1)
if (file.exists(lock.file)) {
cur.time <- Sys.time()
v <- as.double(readLines(lock.file, n=1))
if (length(v) == 0 || is.na(v)) {
log.message("removing lock file without expiration: ", lock.file, level=1)
tryCatch({
unlink(lock.file, force=TRUE)
}, error = function(e) {
log.message(paste0("Failed to remove lock file ", lock.file, " because: ", as.character(e)), level=1)
})
} else {
class(v) <- class(cur.time)
if (v - cur.time < 0) {
# this also works if the write permission is missing
log.message("removing expired lock file: ", lock.file, " (expired on: ", as.character(v), ")", level=1)
tryCatch({
unlink(lock.file, force=TRUE)
}, error = function(e) {
log.message(paste0("Failed to remove lock file ", lock.file, " because: ", as.character(e)))
})
} else {
log.message("lock file not expired (expires on ", as.character(v), ")", level=1)
}
}
} else {
log.message("nothing to remove (", lock.file, " does not exist)", level=1)
}
}
pkg.cache.unlock <- function(pkg.cache.env, version.dir) {
......@@ -51,7 +106,10 @@ pkg.cache.unlock <- function(pkg.cache.env, version.dir) {
version.lock.file <- file.path(version.dir, lock.file.name)
log.message("releasing: ", version.lock.file, level=1)
tryCatch(file.remove(version.lock.file))
tryCatch({
unlink(version.lock.file, force=TRUE)
}, errors = function(e) { log.message(as.character(e), level=1) })
TRUE
}
is.fastr <- function() {
......@@ -94,7 +152,7 @@ pkg.cache.get <- function(pkg.cache.env, pkg, lib) {
# lock version directory
if (!pkg.cache.lock(pkg.cache.env, version.dir)) {
log.message("could not lock version dir ", version.dir, level=1)
log.message("could not fetch: version dir ", version.dir, " is locked", level=1)
return (FALSE)
}
......@@ -115,12 +173,15 @@ pkg.cache.get <- function(pkg.cache.env, pkg, lib) {
tryCatch({
unzip(fromPath, exdir=toPath, unzip = getOption("unzip"))
log.message("package cache hit, using package from ", fromPath)
pkg.cache.unlock(pkg.cache.env, version.dir)
return (TRUE)
}, error = function(e) {
pkg.cache.unlock(pkg.cache.env, version.dir)
log.message("could not extract cached package from ", fromPath , " to ", toPath, level=1)
return (FALSE)
}, finally = pkg.cache.unlock(pkg.cache.env, version.dir) )
})
}
pkg.cache.unlock(pkg.cache.env, version.dir)
log.message("cache miss for package ", pkgname, level=1)
FALSE
......@@ -171,16 +232,18 @@ pkg.cache.insert <- function(pkg.cache.env, pkg, lib) {
})
if(zip(toPath, pkgname, flags="-r9Xq") != 0L) {
pkg.cache.unlock(pkg.cache.env, version.dir)
log.message("could not compress package dir ", fromPath , " and store it to ", toPath, level=1)
return (FALSE)
}
setwd(prev.wd)
log.message("successfully inserted package ", pkgname , " to package cache (", toPath, ")")
pkg.cache.unlock(pkg.cache.env, version.dir)
return (TRUE)
}, error = function(e) {
pkg.cache.unlock(pkg.cache.env, version.dir)
log.message("could not insert package '", pkgname, "' because: ", e$message)
}, finally = pkg.cache.unlock(pkg.cache.env, version.dir) )
})
FALSE
}
......@@ -319,13 +382,17 @@ pkg.cache.get.version <- function(pkg.cache.env, cache.dir, cache.version, table
}
if (!is.null(updated.version.table)) {
version.subdir <- as.character(updated.version.table[updated.version.table$version == cache.version, "dir"])
if (!pkg.cache.lock(pkg.cache.env, cache.dir)) {
return (NULL)
}
try({
tryCatch({
write.csv(updated.version.table, version.table.name, row.names=FALSE)
}, finally=pkg.cache.unlock(pkg.cache.env, cache.dir))
pkg.cache.unlock(pkg.cache.env, cache.dir)
}, error = function(e) {
pkg.cache.unlock(pkg.cache.env, cache.dir)
})
}
# return the version directory
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment