mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apalu...@apache.org
Subject mahout git commit: MAHOUT-1653 and MAHOUT-1685: Upgrade to Spark 1.3. closes apache/mahout#146
Date Fri, 10 Jul 2015 18:28:53 GMT
Repository: mahout
Updated Branches:
  refs/heads/master 05a97bb4e -> bbb90c21d


MAHOUT-1653 and MAHOUT-1685: Upgrade to Spark 1.3. closes apache/mahout#146


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/bbb90c21
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/bbb90c21
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/bbb90c21

Branch: refs/heads/master
Commit: bbb90c21de046d7368c985570f7bab325485f327
Parents: 05a97bb
Author: Andrew Palumbo <apalumbo@apache.org>
Authored: Fri Jul 10 14:26:00 2015 -0400
Committer: Andrew Palumbo <apalumbo@apache.org>
Committed: Fri Jul 10 14:27:37 2015 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +
 pom.xml                                         |   2 +-
 .../sparkbindings/shell/MahoutSparkILoop.scala  | 140 ++++++++++++++-----
 .../mahout/sparkbindings/shell/Main.scala       |  14 +-
 .../drm/CheckpointedDrmSpark.scala              |  19 +--
 .../mahout/sparkbindings/drm/package.scala      |   2 +-
 6 files changed, 127 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 50ca6c3..c097121 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,9 @@ Mahout Change Log
 
 Release 0.11.0 - unreleased
 
+  MAHOUT-1685: Move Mahout shell to Spark 1.3+ (dlyubimov, apalumbo)
+
+  MAHOUT-1653: Spark 1.3 (pferrel, apalumbo)
 
   MAHOUT-1754: Distance and squared distance matrices routines (dlyubimov)
     

http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 25cd27d..68e5286 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,7 @@
     <slf4j.version>1.7.10</slf4j.version>
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
-    <spark.version>1.2.2</spark.version>
+    <spark.version>1.3.1</spark.version>
     <h2o.version>0.1.25</h2o.version>
   </properties>
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index 4d0615a..8df93bd 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -1,16 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.mahout.sparkbindings.shell
 
-import org.apache.spark.repl.SparkILoop
+import org.apache.log4j.PropertyConfigurator
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.repl.SparkILoop
 import scala.tools.nsc.Properties
-import scala.Some
 import org.apache.mahout.sparkbindings._
 
+
 class MahoutSparkILoop extends SparkILoop {
 
-  log.info("Mahout spark shell waking up.")
+  private var _interp: SparkILoop = _
+
+  private var sdc: SparkDistributedContext = _
 
-  private val postInitScript =
+  private val postInitImports =
     "import org.apache.mahout.math._" ::
       "import scalabindings._" ::
       "import RLikeOps._" ::
@@ -21,82 +41,132 @@ class MahoutSparkILoop extends SparkILoop {
       "import collection.JavaConversions._" ::
       Nil
 
-  override protected def postInitialization() {
-    super.postInitialization()
-    val intp: MahoutSparkILoop = this
-    intp.beQuietDuring {
-      postInitScript.foreach(command(_))
-    }
+  def getSparkDistributedContext: SparkDistributedContext  = sdc
+
+  // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark
conf/ even
+  // though the latter is added to the classpath. So we force it to pick it.
+  PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties")
+
+  System.setProperty("scala.usejavacp", "true")
+
+  _interp = this
+
+  // It looks like we need to initialize this too, since some Spark shell initilaization
code
+  // expects it
+  org.apache.spark.repl.Main.interp = _interp
+
+  _interp.setPrompt("mahout> ")
+
+  // sparkILoop.echo(...) is private so we create our own here.
+  def echoToShell(str: String): Unit = {
+    _interp.out.println(str)
   }
 
+  // create a spark context as a mahout SparkDistributedContext.
+  // store the SparkDistributedContext for decleration in the intreperer session.
   override def createSparkContext(): SparkContext = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val master = this.master match {
+    val master = _interp.master match {
       case Some(m) => m
       case None => {
         val prop = System.getenv("MASTER")
         if (prop != null) prop else "local"
       }
     }
+
     val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
-    val conf = new SparkConf()
-        .set("spark.repl.class.uri", intp.classServer.uri)
+    val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri)
 
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri)
     }
 
-    conf.set("spark.executor.memory", "1g")
+    // set default value of spark.executor.memory to 1g
+    if(!conf.contains("spark.executor.memory")) {
+      conf.set("spark.executor.memory", "1g")
+    }
 
-    sparkContext = mahoutSparkContext(
+    sdc = mahoutSparkContext(
       masterUrl = master,
       appName = "Mahout Spark Shell",
       customJars = jars,
       sparkConf = conf
     )
 
-    echo("Created spark context..")
+    _interp.sparkContext = sdc
+
+    echoToShell("Created spark context..")
     sparkContext
   }
 
+  // this is technically not part of Spark's explicitly defined Developer API though
+  // nothing in the SparkILoopInit.scala file is marked as such.
   override def initializeSpark() {
-    intp.beQuietDuring {
-      command("""
 
-         @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext =
-            new org.apache.mahout.sparkbindings.SparkDistributedContext(
-            org.apache.spark.repl.Main.interp.createSparkContext())
+    _interp.beQuietDuring {
+
+      // get the spark context, at the same time create and store a mahout distributed context.
+      _interp.interpret("""
+         @transient val sc = {
+           val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
+           _sc
+         }
+                        """)
+      echoToShell("Spark context is available as \"val sc\".")
+
+      // retrieve the stored mahout SparkDistributedContext.
+      _interp.interpret("""
+         @transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext
=
+            org.apache.spark.repl.Main.interp
+             .asInstanceOf[org.apache.mahout.sparkbindings.shell.MahoutSparkILoop]
+             .getSparkDistributedContext
+                        """)
+      echoToShell("Mahout distributed context is available as \"implicit val sdc\".")
+
+      // create a SQL Context.
+      _interp.interpret("""
+         @transient val sqlContext = {
+           val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext()
+           _sqlContext
+         }
+                        """)
+      _interp.interpret("import org.apache.spark.SparkContext._")
+      _interp.interpret("import sqlContext.implicits._")
+      _interp.interpret("import sqlContext.sql")
+      _interp.interpret("import org.apache.spark.sql.functions._")
+      echoToShell("SQL context available as \"val sqlContext\".")
 
-              """)
-      command("import org.apache.spark.SparkContext._")
-      echo("Mahout distributed context is available as \"implicit val sdc\".")
     }
   }
 
-  override def sparkCleanUp() {
-    echo("Stopping Spark context.")
-    intp.beQuietDuring {
-      command("sdc.stop()")
+  // this is technically not part of Spark's explicitly defined Developer API though
+  // nothing in the SparkILoopInit.scala file is marked as such.
+  override protected def postInitialization() {
+    super.postInitialization()
+    _interp.beQuietDuring {
+      postInitImports.foreach(_interp.interpret(_))
     }
   }
 
-  override def prompt: String = "mahout> "
-
+  // this is technically not part of Spark's explicitly defined Developer API though
+  // nothing in the SparkILoopInit.scala file is marked as such..
   override def printWelcome(): Unit = {
-    echo(
+    echoToShell(
       """
                          _                 _
          _ __ ___   __ _| |__   ___  _   _| |_
         | '_ ` _ \ / _` | '_ \ / _ \| | | | __|
         | | | | | | (_| | | | | (_) | |_| | |_
-        |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__|  version 0.10.0
+        |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__|  version 0.11.0
 
       """)
     import Properties._
     val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
       versionString, javaVmName, javaVersion)
-    echo(welcomeMsg)
-    echo("Type in expressions to have them evaluated.")
-    echo("Type :help for more information.")
+    echoToShell(welcomeMsg)
+    echoToShell("Type in expressions to have them evaluated.")
+    echoToShell("Type :help for more information.")
   }
+
 }
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
index b59c9a7..32bba32 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
@@ -17,26 +17,24 @@
 
 package org.apache.mahout.sparkbindings.shell
 
-import org.apache.mahout.sparkbindings._
 import org.apache.log4j.PropertyConfigurator
+import org.apache.spark.repl.SparkILoop
+import org.apache.mahout.sparkbindings._
 
 
 object Main {
+  private var _interp: SparkILoop = _
 
-  private var _interp: MahoutSparkILoop = _
-
-  def main(args:Array[String]) {
-
-    // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark
conf/ even
-    // though the latter is added to the classpath. So we force it to pick it.
+  def main(args: Array[String]) {
     PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties")
 
     System.setProperty("scala.usejavacp", "true")
     _interp = new MahoutSparkILoop()
+
     // It looks like we need to initialize this too, since some Spark shell initilaization
code
     // expects it
     org.apache.spark.repl.Main.interp = _interp
     _interp.process(args)
-  }
 
+  }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 41efc27..2f5d600 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -29,7 +29,6 @@ import scala.util.Random
 import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.mahout.math.drm._
 import org.apache.mahout.sparkbindings._
-import org.apache.spark.SparkContext._
 
 /** ==Spark-specific optimizer-checkpointed DRM.==
   *
@@ -158,14 +157,18 @@ class CheckpointedDrmSpark[K: ClassTag](
   def dfsWrite(path: String) = {
     val ktag = implicitly[ClassTag[K]]
 
-    implicit val k2wFunc: (K) => Writable =
-      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
-      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
-      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
-      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
-      else throw new IllegalArgumentException("Do not know how to convert class tag %s to
Writable.".format(ktag))
+    // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save.
+    if (ktag.runtimeClass == classOf[Int]) {
+      rddInput.toDrmRdd()
+        .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path)
+    } else if (ktag.runtimeClass == classOf[String]){
+      rddInput.toDrmRdd()
+        .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path)
+    } else if (ktag.runtimeClass == classOf[Long]) {
+      rddInput.toDrmRdd()
+        .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path)
+    } else throw new IllegalArgumentException("Do not know how to convert class tag %s to
Writable.".format(ktag))
 
-    rddInput.toDrmRdd().saveAsSequenceFile(path)
   }
 
   protected def computeNRow = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 0de5ff8..e18d6da 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.log4j.Logger
 import java.lang.Math
-import org.apache.spark.rdd.{FilteredRDD, RDD}
+import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._


Mime
View raw message