From 7b9b179e8a05e00d6d4b78533209ed320998c519 Mon Sep 17 00:00:00 2001 From: joh-mue Date: Fri, 3 Jul 2015 13:29:47 +0200 Subject: [PATCH] Added support for 'Yarn'. Closes #43. --- .../src/main/resources/peel-extensions.xml | 10 ++ .../main/resources/reference.yarn-2.4.1.conf | 13 +++ .../main/resources/reference.yarn-2.7.1.conf | 13 +++ .../hadoop-2/conf/hadoop-env.sh.mustache | 2 +- .../extensions/hadoop/beans/system/Yarn.scala | 104 ++++++++++++++++++ 5 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 peel-extensions/src/main/resources/reference.yarn-2.4.1.conf create mode 100644 peel-extensions/src/main/resources/reference.yarn-2.7.1.conf create mode 100644 peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/hadoop/beans/system/Yarn.scala diff --git a/peel-extensions/src/main/resources/peel-extensions.xml b/peel-extensions/src/main/resources/peel-extensions.xml index 21a8b4a2..f783498d 100644 --- a/peel-extensions/src/main/resources/peel-extensions.xml +++ b/peel-extensions/src/main/resources/peel-extensions.xml @@ -43,6 +43,16 @@ --> + + + + + + + + + + diff --git a/peel-extensions/src/main/resources/reference.yarn-2.4.1.conf b/peel-extensions/src/main/resources/reference.yarn-2.4.1.conf new file mode 100644 index 00000000..103092f5 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-2.4.1.conf @@ -0,0 +1,13 @@ +# include common hadoop-2.x configuration +include "reference.hadoop-2.x.conf" + +system { + hadoop-2 { + path { + archive.url = "https://archive.apache.org/dist/hadoop/core/hadoop-2.4.1/hadoop-2.4.1.tar.gz" + archive.md5 = "0CE4CFD282002B7AA42CF71DF4145150" + archive.src = ${app.path.downloads}"/hadoop-2.4.1.tar.gz" + home = ${system.hadoop-2.path.archive.dst}"/hadoop-2.4.1" + } + } +} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.yarn-2.7.1.conf b/peel-extensions/src/main/resources/reference.yarn-2.7.1.conf new file mode 100644 index 00000000..7cceddfa --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-2.7.1.conf @@ -0,0 +1,13 @@ +# include common hadoop-2.x configuration +include "reference.hadoop-2.x.conf" + +system { + hadoop-2 { + path { + archive.url = "https://archive.apache.org/dist/hadoop/core/hadoop-2.7.1/hadoop-2.7.1.tar.gz" + archive.md5 = "203E5B4DAF1C5658C3386A32C4BE5531" + archive.src = ${app.path.downloads}"/hadoop-2.7.1.tar.gz" + home = ${system.hadoop-2.path.archive.dst}"/hadoop-2.7.1" + } + } +} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/hadoop-2/conf/hadoop-env.sh.mustache b/peel-extensions/src/main/resources/templates/hadoop-2/conf/hadoop-env.sh.mustache index 56ae74a6..8fa4e091 100644 --- a/peel-extensions/src/main/resources/templates/hadoop-2/conf/hadoop-env.sh.mustache +++ b/peel-extensions/src/main/resources/templates/hadoop-2/conf/hadoop-env.sh.mustache @@ -57,7 +57,7 @@ done #export HADOOP_NAMENODE_INIT_HEAPSIZE="" # Root logger log level (default is INFO) -# {{#HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER={{HADOOP_ROOT_LOGGER}}{{/HADOOP_ROOT_LOGGER}}{{^HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER=INFO,console{{/HADOOP_ROOT_LOGGER}} +{{#HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER={{HADOOP_ROOT_LOGGER}}{{/HADOOP_ROOT_LOGGER}}{{^HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER=INFO,console{{/HADOOP_ROOT_LOGGER}} # Extra Java runtime options. Empty by default. {{#HADOOP_OPTS}}export HADOOP_OPTS={{HADOOP_OPTS}}{{/HADOOP_OPTS}}{{^HADOOP_OPTS}}export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"{{/HADOOP_OPTS}} diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/hadoop/beans/system/Yarn.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/hadoop/beans/system/Yarn.scala new file mode 100644 index 00000000..e34aa4f1 --- /dev/null +++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/hadoop/beans/system/Yarn.scala @@ -0,0 +1,104 @@ +package eu.stratosphere.peel.extensions.hadoop.beans.system + +import com.samskivert.mustache.Mustache +import com.typesafe.config.ConfigException +import eu.stratosphere.peel.core.beans.system.Lifespan.Lifespan +import eu.stratosphere.peel.core.beans.system.{SetUpTimeoutException, System} +import eu.stratosphere.peel.core.config.{Model, SystemConfig} +import eu.stratosphere.peel.core.util.shell + +import scala.collection.JavaConverters._ + +/** Wrapper class for Yarn + * + * Implements Yarn as a [[eu.stratosphere.peel.core.beans.system.System System]] class and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param lifespan [[eu.stratosphere.peel.core.beans.system.Lifespan Lifespan]] of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class Yarn(version: String, lifespan: Lifespan, dependencies: Set[System] = Set(), mc: Mustache.Compiler) extends System("yarn", version, lifespan, dependencies, mc) { + + override val configKey = "hadoop-2" + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc) + ) + }) + + override def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + // check if tmp dir exists and create if not + try { + val tmpDir = config.getString(s"system.$configKey.config.defaults.spark.local.dir") + + for (nodemanager <- config.getStringList(s"system.$configKey.config.slaves").asScala) { + logger.info(s"Initializing tmp directory $tmpDir at taskmanager node $nodemanager") + shell ! s""" ssh $user@$nodemanager "rm -Rf $tmpDir" """ + shell ! s""" ssh $user@$nodemanager "mkdir -p $tmpDir" """ + } + } catch { + case _: ConfigException => // ignore not set explicitly, java default is taken + } + + var failedStartUpAttempts = 0 + while(!isUp) { + try { + val total = config.getStringList(s"system.$configKey.config.slaves").size() + // yarn does not reset the resourcemanagers log at startup + val init = Integer.parseInt((shell !! s"""cat $logDir/yarn-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/yarn-daemon.sh start resourcemanager" + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/yarn-daemon.sh start nodemanager" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < total) { + logger.info(s"Connected ${curr - init} from $total nodes") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + curr = Integer.parseInt((shell !! s"""cat $logDir/yarn-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + logger.info(s"Connected ${curr - init} from $total nodes") + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + stop() + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override def stop(): Unit = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/yarn-daemon.sh stop resourcemanager" + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/yarn-daemon.sh stop nodemanager" + + isUp = false + } + + def isRunning = { + (shell ! s"""ps -ef | grep 'yarn' | grep 'java' | grep 'resourcemanager' | grep -v 'grep' """) == 0 // TODO: fix using PID + } +} \ No newline at end of file