Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F3314188B3 for ; Fri, 10 Jul 2015 18:28:53 +0000 (UTC) Received: (qmail 82042 invoked by uid 500); 10 Jul 2015 18:28:53 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 81998 invoked by uid 500); 10 Jul 2015 18:28:53 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 81989 invoked by uid 99); 10 Jul 2015 18:28:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jul 2015 18:28:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A218CE027D; Fri, 10 Jul 2015 18:28:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apalumbo@apache.org To: commits@mahout.apache.org Message-Id: <02629ac858284aec8046890302b2de6a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: mahout git commit: MAHOUT-1653 and MAHOUT-1685: Upgrade to Spark 1.3. closes apache/mahout#146 Date: Fri, 10 Jul 2015 18:28:53 +0000 (UTC) Repository: mahout Updated Branches: refs/heads/master 05a97bb4e -> bbb90c21d MAHOUT-1653 and MAHOUT-1685: Upgrade to Spark 1.3. closes apache/mahout#146 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/bbb90c21 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/bbb90c21 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/bbb90c21 Branch: refs/heads/master Commit: bbb90c21de046d7368c985570f7bab325485f327 Parents: 05a97bb Author: Andrew Palumbo Authored: Fri Jul 10 14:26:00 2015 -0400 Committer: Andrew Palumbo Committed: Fri Jul 10 14:27:37 2015 -0400 ---------------------------------------------------------------------- CHANGELOG | 3 + pom.xml | 2 +- .../sparkbindings/shell/MahoutSparkILoop.scala | 140 ++++++++++++++----- .../mahout/sparkbindings/shell/Main.scala | 14 +- .../drm/CheckpointedDrmSpark.scala | 19 +-- .../mahout/sparkbindings/drm/package.scala | 2 +- 6 files changed, 127 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 50ca6c3..c097121 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,9 @@ Mahout Change Log Release 0.11.0 - unreleased + MAHOUT-1685: Move Mahout shell to Spark 1.3+ (dlyubimov, apalumbo) + + MAHOUT-1653: Spark 1.3 (pferrel, apalumbo) MAHOUT-1754: Distance and squared distance matrices routines (dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 25cd27d..68e5286 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 1.7.10 2.10 2.10.4 - 1.2.2 + 1.3.1 0.1.25 http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala ---------------------------------------------------------------------- diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 4d0615a..8df93bd 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -1,16 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.mahout.sparkbindings.shell -import org.apache.spark.repl.SparkILoop +import org.apache.log4j.PropertyConfigurator import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.repl.SparkILoop import scala.tools.nsc.Properties -import scala.Some import org.apache.mahout.sparkbindings._ + class MahoutSparkILoop extends SparkILoop { - log.info("Mahout spark shell waking up.") + private var _interp: SparkILoop = _ + + private var sdc: SparkDistributedContext = _ - private val postInitScript = + private val postInitImports = "import org.apache.mahout.math._" :: "import scalabindings._" :: "import RLikeOps._" :: @@ -21,82 +41,132 @@ class MahoutSparkILoop extends SparkILoop { "import collection.JavaConversions._" :: Nil - override protected def postInitialization() { - super.postInitialization() - val intp: MahoutSparkILoop = this - intp.beQuietDuring { - postInitScript.foreach(command(_)) - } + def getSparkDistributedContext: SparkDistributedContext = sdc + + // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even + // though the latter is added to the classpath. So we force it to pick it. + PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") + + System.setProperty("scala.usejavacp", "true") + + _interp = this + + // It looks like we need to initialize this too, since some Spark shell initilaization code + // expects it + org.apache.spark.repl.Main.interp = _interp + + _interp.setPrompt("mahout> ") + + // sparkILoop.echo(...) is private so we create our own here. + def echoToShell(str: String): Unit = { + _interp.out.println(str) } + // create a spark context as a mahout SparkDistributedContext. + // store the SparkDistributedContext for decleration in the intreperer session. override def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") - val master = this.master match { + val master = _interp.master match { case Some(m) => m case None => { val prop = System.getenv("MASTER") if (prop != null) prop else "local" } } + val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) - val conf = new SparkConf() - .set("spark.repl.class.uri", intp.classServer.uri) + val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri) if (execUri != null) { conf.set("spark.executor.uri", execUri) } - conf.set("spark.executor.memory", "1g") + // set default value of spark.executor.memory to 1g + if(!conf.contains("spark.executor.memory")) { + conf.set("spark.executor.memory", "1g") + } - sparkContext = mahoutSparkContext( + sdc = mahoutSparkContext( masterUrl = master, appName = "Mahout Spark Shell", customJars = jars, sparkConf = conf ) - echo("Created spark context..") + _interp.sparkContext = sdc + + echoToShell("Created spark context..") sparkContext } + // this is technically not part of Spark's explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such. override def initializeSpark() { - intp.beQuietDuring { - command(""" - @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = - new org.apache.mahout.sparkbindings.SparkDistributedContext( - org.apache.spark.repl.Main.interp.createSparkContext()) + _interp.beQuietDuring { + + // get the spark context, at the same time create and store a mahout distributed context. + _interp.interpret(""" + @transient val sc = { + val _sc = org.apache.spark.repl.Main.interp.createSparkContext() + _sc + } + """) + echoToShell("Spark context is available as \"val sc\".") + + // retrieve the stored mahout SparkDistributedContext. + _interp.interpret(""" + @transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = + org.apache.spark.repl.Main.interp + .asInstanceOf[org.apache.mahout.sparkbindings.shell.MahoutSparkILoop] + .getSparkDistributedContext + """) + echoToShell("Mahout distributed context is available as \"implicit val sdc\".") + + // create a SQL Context. + _interp.interpret(""" + @transient val sqlContext = { + val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext() + _sqlContext + } + """) + _interp.interpret("import org.apache.spark.SparkContext._") + _interp.interpret("import sqlContext.implicits._") + _interp.interpret("import sqlContext.sql") + _interp.interpret("import org.apache.spark.sql.functions._") + echoToShell("SQL context available as \"val sqlContext\".") - """) - command("import org.apache.spark.SparkContext._") - echo("Mahout distributed context is available as \"implicit val sdc\".") } } - override def sparkCleanUp() { - echo("Stopping Spark context.") - intp.beQuietDuring { - command("sdc.stop()") + // this is technically not part of Spark's explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such. + override protected def postInitialization() { + super.postInitialization() + _interp.beQuietDuring { + postInitImports.foreach(_interp.interpret(_)) } } - override def prompt: String = "mahout> " - + // this is technically not part of Spark's explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such.. override def printWelcome(): Unit = { - echo( + echoToShell( """ _ _ _ __ ___ __ _| |__ ___ _ _| |_ | '_ ` _ \ / _` | '_ \ / _ \| | | | __| | | | | | | (_| | | | | (_) | |_| | |_ - |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 + |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.11.0 """) import Properties._ val welcomeMsg = "Using Scala %s (%s, Java %s)".format( versionString, javaVmName, javaVersion) - echo(welcomeMsg) - echo("Type in expressions to have them evaluated.") - echo("Type :help for more information.") + echoToShell(welcomeMsg) + echoToShell("Type in expressions to have them evaluated.") + echoToShell("Type :help for more information.") } + } + http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala ---------------------------------------------------------------------- diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala index b59c9a7..32bba32 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala @@ -17,26 +17,24 @@ package org.apache.mahout.sparkbindings.shell -import org.apache.mahout.sparkbindings._ import org.apache.log4j.PropertyConfigurator +import org.apache.spark.repl.SparkILoop +import org.apache.mahout.sparkbindings._ object Main { + private var _interp: SparkILoop = _ - private var _interp: MahoutSparkILoop = _ - - def main(args:Array[String]) { - - // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even - // though the latter is added to the classpath. So we force it to pick it. + def main(args: Array[String]) { PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") System.setProperty("scala.usejavacp", "true") _interp = new MahoutSparkILoop() + // It looks like we need to initialize this too, since some Spark shell initilaization code // expects it org.apache.spark.repl.Main.interp = _interp _interp.process(args) - } + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 41efc27..2f5d600 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -29,7 +29,6 @@ import scala.util.Random import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.mahout.math.drm._ import org.apache.mahout.sparkbindings._ -import org.apache.spark.SparkContext._ /** ==Spark-specific optimizer-checkpointed DRM.== * @@ -158,14 +157,18 @@ class CheckpointedDrmSpark[K: ClassTag]( def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] - implicit val k2wFunc: (K) => Writable = - if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int]) - else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String]) - else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long]) - else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable] - else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) + // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. + if (ktag.runtimeClass == classOf[Int]) { + rddInput.toDrmRdd() + .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else if (ktag.runtimeClass == classOf[String]){ + rddInput.toDrmRdd() + .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else if (ktag.runtimeClass == classOf[Long]) { + rddInput.toDrmRdd() + .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) - rddInput.toDrmRdd().saveAsSequenceFile(path) } protected def computeNRow = { http://git-wip-us.apache.org/repos/asf/mahout/blob/bbb90c21/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala index 0de5ff8..e18d6da 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.log4j.Logger import java.lang.Math -import org.apache.spark.rdd.{FilteredRDD, RDD} +import org.apache.spark.rdd.RDD import scala.reflect.ClassTag import org.apache.mahout.math.scalabindings._ import RLikeOps._