amaterasu-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] eyalbenivri closed pull request #5: Amaterasu 17
Date Thu, 01 Jan 1970 00:00:00 GMT
eyalbenivri closed pull request #5: Amaterasu 17
URL: https://github.com/apache/incubator-amaterasu/pull/5
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index efc00fc..653c285 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -68,7 +68,7 @@ class ClusterConfig extends Logging {
 
     class Master {
       var cores: Int = 1
-      var memoryMB: Int = 512
+      var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
         if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
@@ -128,7 +128,7 @@ class ClusterConfig extends Logging {
   object Jobs {
 
     var cpus: Double = 1
-    var mem: Long = 512
+    var mem: Long = 1024
     var repoSize: Long = 1024
 
     def load(props: Properties): Unit = {
diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
new file mode 100644
index 0000000..8c000cc
--- /dev/null
+++ b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
@@ -0,0 +1,12 @@
+package org.apache.amaterasu.common.utils
+
+import java.io.File
+
+object FileUtils {
+
+  def getAllFiles(dir: File): Array[File] = {
+    val these = dir.listFiles
+    these ++ these.filter(_.isDirectory).flatMap(getAllFiles)
+  }
+
+}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
index 42faf71..eec0106 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
@@ -46,7 +46,8 @@ object ProvidersFactory {
             outStream: ByteArrayOutputStream,
             notifier: Notifier,
             executorId: String,
-            propFile:String = null): ProvidersFactory = {
+            hostName: String,
+            propFile: String = null): ProvidersFactory = {
 
     val result = new ProvidersFactory()
     val reflections = new Reflections(getClass.getClassLoader)
@@ -56,7 +57,7 @@ object ProvidersFactory {
 
       val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
 
-      provider.init(data, jobId, outStream, notifier, executorId, propFile)
+      provider.init(data, jobId, outStream, notifier, executorId, propFile, hostName)
       notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created")
       (provider.getGroupIdentifier, provider)
     }).toMap
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index 7e56742..ce3b2ba 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -52,7 +52,8 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
                     outStream: ByteArrayOutputStream,
                     notifier: Notifier,
                     executorId: String,
-                    propFile: String): Unit = {
+                    propFile: String,
+                    hostName: String): Unit = {
 
     val config = ClusterConfig(new FileInputStream(propFile))
     shellLoger = ProcessLogger(
@@ -76,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
     val sparkAppName = s"job_${jobId}_executor_$executorId"
 
     SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv,
propFile)
+    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv,
propFile, hostName)
 
     lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier,
jars)
     sparkScalaRunner.initializeAmaContext(data.env)
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
index 7152ff6..969eb0b 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
@@ -78,15 +78,18 @@ class ActionsExecutor extends Executor with Logging {
     this.executorDriver = driver
     val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray),
classOf[ExecData])
 
+    // this is used to resolve the spark drier address
+    val hostName = slaveInfo.getHostname
     notifier = new MesosNotifier(driver)
     notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
     val outStream = new ByteArrayOutputStream()
-    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue)
+    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue,
hostName)
 
   }
 
   override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
 
+
     notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
     log.debug(s"launching task: $taskInfo")
     val status = TaskStatus.newBuilder
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index 3ad2fda..05637cb 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -1,7 +1,7 @@
 package org.apache.amaterasu.executor.yarn.executors
 
 import java.io.ByteArrayOutputStream
-import java.net.URLDecoder
+import java.net.{InetAddress, URLDecoder}
 
 import scala.collection.JavaConverters._
 import com.fasterxml.jackson.databind.ObjectMapper
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark.SparkContext
 
@@ -58,7 +59,9 @@ object ActionsExecutorLauncher extends App with Logging {
     case _ => urlses(cl.getParent)
   }
 
+  val hostName = InetAddress.getLocalHost.getHostName
 
+  log.info(s"Hostname resolved to: $hostName")
   val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
 
@@ -89,6 +92,6 @@ object ActionsExecutorLauncher extends App with Logging {
   val notifier = new YarnNotifier(new YarnConfiguration())
 
   log.info("Setup notifier")
-  actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId,
propFile = "./amaterasu.properties")
+  actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId,
hostName, propFile = "./amaterasu.properties")
   actionsExecutor.execute()
 }
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index 537bde8..ba6a3e1 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -22,6 +22,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.common.utils.FileUtils
 import org.apache.spark.repl.amaterasu.AmaSparkILoop
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
@@ -103,12 +104,14 @@ object SparkRunnerHelper extends Logging {
     interpreter = result
   }
 
-  def getAllFiles(dir: File): Array[File] = {
-    val these = dir.listFiles
-    these ++ these.filter(_.isDirectory).flatMap(getAllFiles)
-  }
 
-  def createSpark(env: Environment, sparkAppName: String, jars: Seq[String], sparkConf: Option[Map[String,
Any]], executorEnv: Option[Map[String, Any]], propFile: String): SparkSession = {
+  def createSpark(env: Environment,
+                  sparkAppName: String,
+                  jars: Seq[String],
+                  sparkConf: Option[Map[String, Any]],
+                  executorEnv: Option[Map[String, Any]],
+                  propFile: String,
+                  hostName: String): SparkSession = {
 
     val config = if (propFile != null) {
       import java.io.FileInputStream
@@ -119,12 +122,12 @@ object SparkRunnerHelper extends Logging {
 
     Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
 
-    val pyfiles = getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py")
||
+    val pyfiles = FileUtils.getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py")
||
       f.getName.endsWith(".egg") ||
       f.getName.endsWith(".zip"))
 
     conf.setAppName(sparkAppName)
-      .set("spark.driver.host", getNode)
+      .set("spark.driver.host", hostName)
       .set("spark.submit.deployMode", "client")
       .set("spark.hadoop.validateOutputSpecs", "false")
       .set("spark.logConf", "true")
@@ -149,7 +152,7 @@ object SparkRunnerHelper extends Logging {
 
           .set("spark.master", "yarn")
           .set("spark.executor.instances", "1") // TODO: change this
-          .set("spark.yarn.jars", s"${config.spark.home}/jars/*")
+          .set("spark.yarn.jars", s"spark/jars/*")
           .set("spark.executor.memory", "1g")
           .set("spark.dynamicAllocation.enabled", "false")
           //.set("spark.shuffle.service.enabled", "true")
diff --git a/leader/build.gradle b/leader/build.gradle
index 2bbeb75..9ecd17c 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -21,8 +21,8 @@ plugins {
     id 'java'
 }
 
-sourceCompatibility = 1.7
-targetCompatibility = 1.7
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
 
 repositories {
     maven {
@@ -33,8 +33,11 @@ repositories {
 
 dependencies {
     compile 'org.scala-lang:scala-library:2.11.8'
+    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
+//    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
 
     compile project(':common')
+    compile project(':amaterasu-sdk')
 
     compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
@@ -57,6 +60,7 @@ dependencies {
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
     compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
+    compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
 
     testCompile project(':common')
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index bcf890d..e085d6e 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -18,6 +18,8 @@
 
 import org.apache.amaterasu.common.configuration.ClusterConfig;
 
+import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory;
+import org.apache.amaterasu.sdk.FrameworkSetupProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -108,7 +110,7 @@ public void run(JobOpts opts, String[] args) throws Exception {
 
         List<String> commands = Collections.singletonList(
                 "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
-                "$JAVA_HOME/bin/java" +
+                        "$JAVA_HOME/bin/java" +
                         " -Dscala.usejavacp=false" +
                         " -Xmx1G" +
                         " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
@@ -127,16 +129,33 @@ public void run(JobOpts opts, String[] args) throws Exception {
         try {
             if (!fs.exists(jarPathQualified)) {
                 File home = new File(opts.home);
+                fs.mkdirs(jarPathQualified);
                 for (File f : home.listFiles()) {
-                    fs.mkdirs(jarPathQualified);
                     fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
                 }
+
+                // setup frameworks
+                FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(config);
+                for (String group : frameworkFactory.groups()) {
+                    FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
+
+                    //creating a group folder
+                    Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" +
framework.getGroupIdentifier()));
+                    System.out.println("===> " + frameworkPath.toString());
+
+                    fs.mkdirs(frameworkPath);
+                    for (File file : framework.getGroupResources()) {
+                        if (file.exists())
+                            fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()),
frameworkPath);
+                    }
+                }
             }
         } catch (IOException e) {
             LOGGER.error("Error uploading ama folder to HDFS.", e);
             System.exit(3);
         }
 
+
         // get version of build
         String version = config.version();
 
@@ -228,6 +247,10 @@ public void run(JobOpts opts, String[] args) throws Exception {
         LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(),
appReport.getFinishTime());
     }
 
+    private static void copyDirRecursive(){
+
+    }
+
     private boolean isAppFinished(YarnApplicationState appState) {
         return appState == YarnApplicationState.FINISHED ||
                 appState == YarnApplicationState.KILLED ||
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
index b9637ce..8ef1c7a 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
@@ -27,6 +27,7 @@ import org.apache.amaterasu.leader.execution.JobManager
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.io.Source
 
 /**
@@ -103,6 +104,12 @@ object JobParser {
       attempts
     )
 
+    //updating the list of frameworks setup
+    manager.frameworks.getOrElseUpdate(action.data.groupId,
+                                       new mutable.HashSet[String]())
+                                       .add(action.data.typeId)
+
+
     if (manager.head == null)
       manager.head = action
 
@@ -125,6 +132,11 @@ object JobParser {
 
       action.data.errorActionId = errorAction.data.id
       manager.registerAction(errorAction)
+
+      //updating the list of frameworks setup
+      manager.frameworks.getOrElseUpdate(errorAction.data.groupId,
+        new mutable.HashSet[String]())
+        .add(errorAction.data.typeId)
     }
 
     parseActions(actions.tail, manager, actionsQueue, attempts, action)
@@ -150,7 +162,6 @@ object JobParser {
       client,
       attempts
     )
-
   }
 
   def parseErrorAction(
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
index adb5e8f..38f4b7c 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
@@ -25,6 +25,7 @@ import org.apache.amaterasu.leader.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
 
 /**
   * The JobManager manages the lifecycle of a job. It queues new actions for execution,
@@ -33,16 +34,15 @@ import scala.collection.concurrent.TrieMap
   */
 class JobManager extends Logging {
 
-  var name: String = null
-  var jobId: String = null
-  var client: CuratorFramework = null
-  var head: Action = null
-
-  val jobReport = new StringBuilder
+  var name: String = _
+  var jobId: String = _
+  var client: CuratorFramework = _
+  var head: Action = _
 
   // TODO: this is not private due to tests, fix this!!!
   val registeredActions = new TrieMap[String, Action]
-  private var executionQueue: BlockingQueue[ActionData] = null
+  val frameworks = new TrieMap[String, mutable.HashSet[String]]
+  private var executionQueue: BlockingQueue[ActionData] = _
 
   /**
     * The start method initiates the job execution by executing the first action.
@@ -50,15 +50,6 @@ class JobManager extends Logging {
     */
   def start(): Unit = {
 
-    jobReport.append(
-      s"""
-         | ******************************************************************
-         | * started job with id: $jobId *
-         | ******************************************************************
-         | *                                                                *
-       """.stripMargin
-    )
-    jobReport.append("\n")
     head.execute()
 
   }
@@ -77,7 +68,7 @@ class JobManager extends Logging {
     val nextAction: ActionData = executionQueue.poll()
 
     if (nextAction != null) {
-      registeredActions.get(nextAction.id).get.announceStart
+      registeredActions(nextAction.id).announceStart
     }
 
     nextAction
@@ -85,10 +76,9 @@ class JobManager extends Logging {
 
   def reQueueAction(actionId: String): Unit = {
 
-    jobReport.append(s" *+-> action: $actionId re-queued for execution               
          *\n")
-    val action = registeredActions.get(actionId).get
+    val action = registeredActions(actionId)
     executionQueue.put(action.data)
-    registeredActions.get(actionId).get.announceQueued
+    registeredActions(actionId).announceQueued
 
   }
 
@@ -110,7 +100,6 @@ class JobManager extends Logging {
     */
   def actionComplete(actionId: String): Unit = {
 
-    jobReport.append(s" *+-> action: $actionId completed                             
  *\n")
     val action = registeredActions.get(actionId).get
     action.announceComplete
     action.data.nextActionIds.foreach(id =>
@@ -130,12 +119,6 @@ class JobManager extends Logging {
   def actionFailed(actionId: String, message: String): Unit = {
 
     log.warn(message)
-    jobReport.append(
-      s""" *+-> action: $actionId failed with message:                     *
-           |  $message
-       """.stripMargin
-    )
-    jobReport.append("\n")
 
     val action = registeredActions.get(actionId).get
     val id = action.handleFailure(message)
@@ -160,7 +143,6 @@ class JobManager extends Logging {
     */
   def actionStarted(actionId: String): Unit = {
 
-    jobReport.append(s" *+-> action: $actionId started                               
  *\n")
     val action = registeredActions.get(actionId).get
     action.announceStart
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
new file mode 100644
index 0000000..67d07a8
--- /dev/null
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
@@ -0,0 +1,45 @@
+package org.apache.amaterasu.leader.execution.frameworks
+
+import java.net.{URL, URLClassLoader}
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.sdk.FrameworkSetupProvider
+import org.reflections.Reflections
+
+import scala.collection.JavaConversions._
+
+class FrameworkProvidersFactory {
+  var providers: Map[String, FrameworkSetupProvider] = _
+
+  def groups: Array[String] = {
+    providers.keys.toArray
+  }
+
+  def getFramework(groupId: String): FrameworkSetupProvider = {
+    providers(groupId)
+  }
+}
+
+object FrameworkProvidersFactory extends Logging {
+
+  def apply(config: ClusterConfig): FrameworkProvidersFactory = {
+
+    val result = new FrameworkProvidersFactory()
+
+    val reflections = new Reflections(getClass.getClassLoader)
+    val runnerTypes = reflections.getSubTypesOf(classOf[FrameworkSetupProvider]).toSet
+
+    result.providers = runnerTypes.map(r => {
+
+      val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider]
+
+      provider.init(config)
+      log.info(s"a provider for group ${provider.getGroupIdentifier} was created")
+      (provider.getGroupIdentifier, provider)
+
+    }).toMap
+
+    result
+  }
+}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
new file mode 100644
index 0000000..8515102
--- /dev/null
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
@@ -0,0 +1,35 @@
+package org.apache.amaterasu.leader.frameworks.spark
+
+import java.io.File
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.utils.FileUtils
+import org.apache.amaterasu.sdk.FrameworkSetupProvider
+
+import scala.collection.mutable
+
+class SparkSetupProvider extends FrameworkSetupProvider {
+
+  private var conf: ClusterConfig = _
+  private val runnersResources = mutable.Map[String,Array[File]]()
+
+  override def init(conf: ClusterConfig): Unit = {
+    this.conf = conf
+
+    runnersResources += "scala" -> Array.empty[File]
+    runnersResources += "sql" -> Array.empty[File]
+    //TODO: Nadav needs to setup conda here
+    runnersResources += "python" -> Array.empty[File]
+  }
+
+  override def getGroupIdentifier: String = "spark"
+
+  override def getGroupResources: Array[File] = {
+    new File(conf.spark.home).listFiles
+  }
+
+  override def getRunnerResources(runnerId: String): Array[File] = {
+    runnersResources(runnerId)
+  }
+
+}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 16bcec2..9844d07 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -26,11 +26,12 @@ import com.google.gson.Gson
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
 import org.apache.amaterasu.leader.utilities.{Args, DataLoader}
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants
@@ -44,7 +45,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.collection.concurrent
+import scala.collection.{concurrent, mutable}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
@@ -151,7 +152,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging
{
     // Resource requirements for worker containers
     // TODO: this should be per task based on the framework config
     this.capability = Records.newRecord(classOf[Resource])
-    this.capability.setMemory(Math.min(config.taskMem, 512))
+    this.capability.setMemory(Math.min(config.taskMem, 1024))
     this.capability.setVirtualCores(1)
 
     while (!jobManager.outOfActions) {
@@ -210,9 +211,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging
{
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
         val commands: List[String] = List(
           "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
-          s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ",
-          s"java -cp ${config.spark.home}/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/
" +
-            "-Xmx512M " +
+          s"/bin/bash spark/bin/load-spark-env.sh && ",
+          s"java -cp spark/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/
" +
+            "-Xmx1G " +
             "-Dscala.usejavacp=true " +
             "-Dhdp.version=2.6.1.0-129 " +
             "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
@@ -226,15 +227,25 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with
Logging {
 
         ctx.setCommands(commands)
         ctx.setTokens(allTokens)
-        ctx.setLocalResources(Map[String, LocalResource](
+
+        var resources = mutable.Map[String, LocalResource](
           "executor.jar" -> executorJar,
           "amaterasu.properties" -> propFile,
+          // TODO: Nadav/Eyal all of these should move to the executor resource setup
           "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/Miniconda2-latest-Linux-x86_64.sh"))),
           "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/codegen.py"))),
           "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/runtime.py"))),
           "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath,
new Path("/dist/spark-version-info.properties"))),
-          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py")))
-        ))
+          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py"))))
+
+        val frameworkFactory = FrameworkProvidersFactory(config)
+        val framework = frameworkFactory.getFramework(actionData.groupId)
+
+        //adding the framework and executor resources
+        setupResources(framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
+        setupResources(s"${framework.getGroupIdentifier}/${actionData.typeId}", resources,
s"${framework.getGroupIdentifier}-${actionData.typeId}")
+
+        ctx.setLocalResources(resources)
 
         ctx.setEnvironment(Map[String, String](
           "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
@@ -261,6 +272,22 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with
Logging {
     }
   }
 
+  private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String,
LocalResource], resourcesPath: String): Unit = {
+
+    val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))
+
+    if (fs.exists(sourcePath)) {
+
+      val files = fs.listFiles(sourcePath, true)
+
+      while (files.hasNext) {
+        val res = files.next()
+        val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
+        countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
+      }
+    }
+  }
+
   def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String):
Unit = {
     import java.io.IOException
 
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh
index 8c0abce..153eba8 100755
--- a/leader/src/main/scripts/ama-start-yarn.sh
+++ b/leader/src/main/scripts/ama-start-yarn.sh
@@ -119,9 +119,9 @@ if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
 fi
 
 eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu"
-eval "hdfs dfs -mkdir /apps/amaterasu/"
-eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
-eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
+#eval "hdfs dfs -mkdir /apps/amaterasu/"
+#eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
+#eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
 eval $CMD | grep "===>"
 kill $SERVER_PID
 
diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties
index a9d592f..6b40036 100644
--- a/leader/src/main/scripts/log4j.properties
+++ b/leader/src/main/scripts/log4j.properties
@@ -5,4 +5,6 @@ log4j.rootLogger=DEBUG, stdout, file
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
+log4j.logger.org.reflections=OFF
\ No newline at end of file
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java
new file mode 100644
index 0000000..dc31e4f
--- /dev/null
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java
@@ -0,0 +1,17 @@
+package org.apache.amaterasu.sdk;
+
+import org.apache.amaterasu.common.configuration.ClusterConfig;
+
+import java.io.File;
+
+public interface FrameworkSetupProvider {
+
+    void init(ClusterConfig conf);
+
+    String getGroupIdentifier();
+
+    File[] getGroupResources();
+
+    File[] getRunnerResources(String runnerId);
+
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
index 35b9e69..01fe266 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
@@ -33,7 +33,8 @@ void init(ExecData data,
               ByteArrayOutputStream outStream,
               Notifier notifier,
               String executorId,
-              String propFile);
+              String propFile,
+              String hostName);
 
     String getGroupIdentifier();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message