Skip to content

Commit

Permalink
Adds automatic cleanup of Spark work directory
Browse files Browse the repository at this point in the history
Spark uses the 'work' directory to save files (job-jar, ...) required to
run. Per default theses files are never deleted, which can cause quite
heavy memory consumption.

This commit enables a periodic cleanup of the folder during the
execution of an suite/exp (set in the reference.spark.conf) plus an
manual cleanup on startup/quit of Spark.
  • Loading branch information
akunft committed Jun 23, 2016
1 parent 0ac37a9 commit 2731d71
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
3 changes: 3 additions & 0 deletions peel-extensions/src/main/resources/reference.spark.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ system {
home = ${app.path.systems}"/spark"
config = ${system.spark.path.home}"/conf"
log = ${system.spark.path.home}"/logs"
work = ${system.spark.path.home}"/work"
}
startup {
max.attempts = ${system.default.startup.max.attempts}
Expand All @@ -28,6 +29,8 @@ system {
SPARK_WORKER_CORES = ${system.default.config.parallelism.per-node}
SPARK_EXECUTOR_CORES = ${system.default.config.parallelism.per-node}
SPARK_EXECUTOR_MEMORY = "512m"
# Enables periodic cleanup of worker / application dirs every 5 min. for data older than 1 hour.
SPARK_WORKER_OPTS = """"-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=300 -Dspark.worker.cleanup.appDataTtl=3600""""
}
# spark-defaults.conf entries
defaults {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
{{#SPARK_WORKER_WEBUI_PORT}}SPARK_WORKER_WEBUI_PORT={{SPARK_WORKER_WEBUI_PORT}}{{/SPARK_WORKER_WEBUI_PORT}}{{^SPARK_WORKER_WEBUI_PORT}}# - SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker{{/SPARK_WORKER_WEBUI_PORT}}
{{#SPARK_WORKER_INSTANCES}}SPARK_WORKER_INSTANCES={{SPARK_WORKER_INSTANCES}}{{/SPARK_WORKER_INSTANCES}}{{^SPARK_WORKER_INSTANCES}}# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node{{/SPARK_WORKER_INSTANCES}}
{{#SPARK_WORKER_DIR}}SPARK_WORKER_DIR={{SPARK_WORKER_DIR}}{{/SPARK_WORKER_DIR}}{{^SPARK_WORKER_DIR}}# - SPARK_WORKER_DIR, to set the working directory of worker processes{{/SPARK_WORKER_DIR}}
{{#SPARK_WORKER_OPTS}}SPARK_WORKER_OPTS={{SPARK_WORKER_OPTS}}{{/SPARK_WORKER_OPTS}}{{^SPARK_WORKER_OPTS}}# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y"){{/SPARK_WORKER_OPTS}}
{{#SPARK_WORKER_OPTS}}SPARK_WORKER_OPTS={{{SPARK_WORKER_OPTS}}}{{/SPARK_WORKER_OPTS}}{{^SPARK_WORKER_OPTS}}# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y"){{/SPARK_WORKER_OPTS}}
{{#SPARK_HISTORY_OPTS}}SPARK_HISTORY_OPTS={{SPARK_HISTORY_OPTS}}{{/SPARK_HISTORY_OPTS}}{{^SPARK_HISTORY_OPTS}}# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y"){{/SPARK_HISTORY_OPTS}}
{{#SPARK_DAEMON_JAVA_OPTS}}SPARK_DAEMON_JAVA_OPTS={{SPARK_DAEMON_JAVA_OPTS}}{{/SPARK_DAEMON_JAVA_OPTS}}{{^SPARK_DAEMON_JAVA_OPTS}}# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y"){{/SPARK_DAEMON_JAVA_OPTS}}
{{#SPARK_PUBLIC_DNS}}SPARK_PUBLIC_DNS={{SPARK_PUBLIC_DNS}}{{/SPARK_PUBLIC_DNS}}{{^SPARK_PUBLIC_DNS}}# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers{{/SPARK_PUBLIC_DNS}}
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,27 @@ class Spark(
s""" ssh $user@$host "$cmd" """
}

val rmWorkDir = (host: String, workDir: String) => {
val cmd = s""" rm -Rf $workDir/* """
s""" ssh $user@$host "$cmd" """
}

val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala
val paths = config.getString(s"system.$configKey.config.defaults.spark.local.dir").split(',')
val workDir = config.getString(s"system.$configKey.path.work")

val futureInitOps = Future.traverse(hosts)(host => Future {
logger.info(s"Initializing Spark tmp directories '${paths.mkString(",")}' at $host")
shell ! (init(host, paths), s"Unable to initialize Spark tmp directories '${paths.mkString(",")}' at $host.")
})
val futureInitOps = Future.traverse(hosts){ host =>
for {
_ <- Future {
logger.info(s"Initializing Spark tmp directories '${paths.mkString(",")}' at $host")
shell ! (init(host, paths), s"Unable to initialize Spark tmp directories '${paths.mkString(",")}' at $host.")
}
f <- Future {
logger.debug(s"Removing Spark work directory content '$workDir' at $host")
shell ! (rmWorkDir(host, workDir), s"Unable to remove Spark work directory content '$workDir' at $host.", fatal = false)
}
} yield f
}

// await for all futureInitOps to finish
Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds)
Expand Down

0 comments on commit 2731d71

Please sign in to comment.