mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject git commit: Revert "MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key, Vec Pairs for Text-Keyed SequenceFiles. this closes apache/mahout #52"
Date Sat, 27 Sep 2014 17:57:58 GMT
Repository: mahout
Updated Branches:
  refs/heads/master 034afd611 -> 3c88f5043


Revert "MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs
for Text-Keyed SequenceFiles. this closes apache/mahout #52"

This reverts commit 034afd61126660b6f44b232fdf3e1dd9d1a79708.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3c88f504
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3c88f504
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3c88f504

Branch: refs/heads/master
Commit: 3c88f50432dc8dd70ab7eb03e1bbf419f3f7ab81
Parents: 034afd6
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Authored: Sat Sep 27 10:57:24 2014 -0700
Committer: Dmitriy Lyubimov <dlyubimov@apache.org>
Committed: Sat Sep 27 10:57:24 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 -
 .../apache/mahout/h2obindings/H2OEngine.scala   |  2 +-
 .../h2obindings/drm/CheckpointedDrmH2O.scala    |  4 +-
 .../mahout/math/drm/CheckpointedDrm.scala       | 10 +--
 .../mahout/math/drm/DistributedEngine.scala     |  2 +-
 .../org/apache/mahout/math/drm/DrmLike.scala    |  2 -
 .../org/apache/mahout/math/drm/package.scala    |  2 +-
 .../mahout/math/drm/DrmLikeSuiteBase.scala      |  8 +--
 pom.xml                                         |  2 +-
 .../sparkbindings/shell/MahoutSparkILoop.scala  |  3 -
 .../org/apache/mahout/common/DrmMetadata.scala  | 54 ----------------
 .../org/apache/mahout/common/HDFSUtil.scala     | 26 --------
 .../apache/mahout/common/Hadoop1HDFSUtil.scala  | 65 --------------------
 .../mahout/sparkbindings/SparkEngine.scala      | 57 +++++++++++------
 .../drm/CheckpointedDrmSpark.scala              |  5 +-
 15 files changed, 48 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bc1edcf..e650428 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,8 +2,6 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
-  MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed
SequenceFiles (dlyubimov & apalumbo)
-
   MAHOUT-1610: Update tests to pass in Java 8 (srowen)
 
   MAHOUT-1608: Add option in WikipediaToSequenceFile to remove category labels from documents
(apalumbo)

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 06125fe..54d950b 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -54,7 +54,7 @@ object H2OEngine extends DistributedEngine {
     *
     *  @return DRM[Any] where Any is automatically translated to value type
     */
-  def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_]
=
+  def drmFromHDFS(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_]
=
     new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)
 
   /** This creates an empty DRM with specified number of partitions and cardinality. */

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index c06455a..15af5e2 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -29,8 +29,6 @@ class CheckpointedDrmH2O[K: ClassTag](
     */
   def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm)
 
-  /** Explicit extraction of key class Tag   */
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
 
   /* XXX: call frame.remove */
   def uncache(): this.type = this
@@ -38,7 +36,7 @@ class CheckpointedDrmH2O[K: ClassTag](
   /**
     * Persist DRM to on-disk over HDFS in Mahout DRM format.
     */
-  def dfsWrite(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm)
+  def writeDRM(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm)
 
   /**
     * Action operator - Eagerly evaluate the lazily built operator graph to create

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 082e5b9..28fb7fd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -18,7 +18,6 @@
 package org.apache.mahout.math.drm
 
 import org.apache.mahout.math.Matrix
-import scala.reflect.ClassTag
 
 /**
  * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can
be
@@ -29,16 +28,9 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
 
   def collect: Matrix
 
-  def dfsWrite(path: String)
+  def writeDRM(path: String)
 
   /** If this checkpoint is already declared cached, uncache. */
   def uncache(): this.type
 
-  /**
-   * Explicit extraction of key class Tag since traits don't support context bound access;
but actual
-   * implementation knows it
-   */
-  def keyClassTag: ClassTag[K]
-
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index eaf5aeb..d89cc53 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -68,7 +68,7 @@ trait DistributedEngine {
    * @param path The DFS path to load from
    * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
    */
-  def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
+  def drmFromHDFS(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
 
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as
data set keys. */
   def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index b9c50b0..97fe989 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.math.drm
 
-import scala.reflect.ClassTag
-
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index b787ec0..02e8b7a 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -49,7 +49,7 @@ package object drm {
   def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
 
   /** Load DRM from hdfs (as in Mahout DRM format) */
-  def drmDfsRead (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmDfsRead(path)
+  def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] =
ctx.drmFromHDFS(path)
 
   /** Shortcut to parallelizing matrices with indices, ignore row labels. */
   def drmParallelize(m: Matrix, numPartitions: Int = 1)

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
index 6c9313c..7a13124 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -23,7 +23,6 @@ import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
 import RLikeDrmOps._
-import scala.reflect.ClassTag
 
 /** Common DRM tests to be run by all distributed engines. */
 trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
@@ -36,15 +35,12 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
     val inCoreA = dense((1, 2, 3), (3, 4, 5))
     val drmA = drmParallelize(inCoreA)
 
-    drmA.dfsWrite(path = uploadPath)
+    drmA.writeDRM(path = uploadPath)
 
     println(inCoreA)
 
     // Load back from hdfs
-    val drmB = drmDfsRead(path = uploadPath)
-
-    // Make sure keys are correctly identified as ints
-    drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int
+    val drmB = drmFromHDFS(path = uploadPath)
 
     // Collect back into in-core
     val inCoreB = drmB.collect

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3696eb5..89ed1a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -701,7 +701,7 @@
     <module>math-scala</module>
     <module>spark</module>
     <module>spark-shell</module>
-    <!--module>h2o</module -->
+    <module>h2o</module>
   </modules>
   <profiles>
     <profile>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index 107fb1e..0df42a3 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -45,9 +45,6 @@ class MahoutSparkILoop extends SparkILoop {
       conf.set("spark.executor.uri", execUri)
     }
 
-    // TODO:XXX remove this beforre pushing to apache/master
-    conf.set("spark.kryoserializer.buffer.mb", "100")
-
     sparkContext = mahoutSparkContext(
       masterUrl = master,
       appName = "Mahout Spark Shell",

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
deleted file mode 100644
index e65b6d7..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.mahout.common
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io._
-
-class DrmMetadata(
-
-    /** Writable  key type as a sub-type of Writable */
-    val keyTypeWritable: Class[_],
-
-    /** Value writable type, as a sub-type of Writable */
-    val valueTypeWritable: Class[_]
-
-    ) {
-
-  import DrmMetadata._
-
-  val (
-
-      /** Actual drm key class tag once converted out of writable */
-      keyClassTag: ClassTag[_],
-
-      /** Conversion from Writable to value type of the DRM key */
-      keyW2ValFunc: ((Writable) => Any)
-
-      ) = keyTypeWritable match {
-    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
-    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
-    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
-    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
-    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
-    case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool
_
-    case cz if (cz == classOf[ArrayWritable]) => ClassTag(classOf[Array[Byte]]) ->
w2bytes _
-    case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
-  }
-
-}
-
-object DrmMetadata {
-
-  private[common] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
-
-  private[common] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
-
-  private[common] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get()
-
-  private[common] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get()
-
-  private[common] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
-
-  private[common] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get()
-
-  private[common] def w2bytes(w: Writable) = w.asInstanceOf[BytesWritable].copyBytes()
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
deleted file mode 100644
index f5f87d7..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.common
-
-/** High level Hadoop version-specific hdfs manipulations we need in context of our operations.
*/
-trait HDFSUtil {
-
-  /** Read DRM header information off (H)DFS. */
-  def readDrmHeader(path:String):DrmMetadata
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 2fb7190..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.common
-
-import org.apache.hadoop.io.{Writable, SequenceFile}
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.conf.Configuration
-import collection._
-import JavaConversions._
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies.
May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-  
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    val partFilePath:Path = fs.listStatus(dfsPath)
-
-        // Filter out anything starting with .
-        .filter { s => !s.getPath.getName.startsWith("\\.") && s.isFile}
-
-        // Take path
-        .map(_.getPath)
-
-        // Take only one, if any
-        .headOption
-
-        // Require there's at least one partition file found.
-        .getOrElse {
-      throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
-    }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
-      )
-    } finally {
-      reader.close()
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 08b2c34..54f33ef 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,10 +17,7 @@
 
 package org.apache.mahout.sparkbindings
 
-import java.io.IOException
-
 import org.apache.mahout.math._
-import org.apache.spark.deploy.SparkHadoopUtil
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
@@ -29,20 +26,17 @@ import org.apache.mahout.math._
 import scala.reflect.ClassTag
 import org.apache.spark.storage.StorageLevel
 import org.apache.mahout.sparkbindings.blas._
-import org.apache.hadoop.io._
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import scala.Some
 import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.RLikeDrmOps._
 import org.apache.spark.rdd.RDD
-import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
 
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
 
-  // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
-
   def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
@@ -131,20 +125,47 @@ object SparkEngine extends DistributedEngine {
    *
    * @return DRM[Any] where Any is automatically translated to value type
    */
-  def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
= {
+  def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
= {
 
-    val drmMetadata = hdfsUtils.readDrmHeader(path)
-    val k2vFunc = drmMetadata.keyW2ValFunc
-
-    // Load RDD and convert all Writables to value types right away (due to reuse of writables
in
-    // Hadoop we must do it right after read operation).
     val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions
= parMin)
+        // Get rid of VectorWritable
+        .map(t => (t._1, t._2.get()))
+
+    def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]]
+
+    // Spark should've loaded the type info from the header, right?
+    val keyTag = getKeyClassTag(rdd)
+
+    val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match {
+
+      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => (
+          (v: AnyRef) => v.asInstanceOf[IntWritable].get,
+          (x: Any) => new IntWritable(x.asInstanceOf[Int]),
+          implicitly[ClassTag[Int]])
 
-        // Immediately convert keys and value writables into value types.
-        .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()}
+      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => (
+          (v: AnyRef) => v.asInstanceOf[Text].toString,
+          (x: Any) => new Text(x.toString),
+          implicitly[ClassTag[String]])
 
-    // Wrap into a DRM type with correct matrix row key class tag evident.
-    drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => (
+          (v: AnyRef) => v.asInstanceOf[LongWritable].get,
+          (x: Any) => new LongWritable(x.asInstanceOf[Int]),
+          implicitly[ClassTag[Long]])
+
+      case xx: ClassTag[Writable] => (
+          (v: AnyRef) => v,
+          (x: Any) => x.asInstanceOf[Writable],
+          ClassTag(classOf[Writable]))
+    }
+
+    {
+      implicit def getWritable(x: Any): Writable = val2keyFunc()
+
+      val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)}
+
+      drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]])
+    }
   }
 
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as
data set keys. */

http://git-wip-us.apache.org/repos/asf/mahout/blob/3c88f504/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index b753f6f..cc5ebf2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -65,9 +65,6 @@ class CheckpointedDrmSpark[K: ClassTag](
   private var cached: Boolean = false
   override val context: DistributedContext = rdd.context
 
-  /** Explicit extraction of key class Tag   */
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
-
   /**
    * Action operator -- does not necessary means Spark action; but does mean running BLAS
optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.
@@ -155,7 +152,7 @@ class CheckpointedDrmSpark[K: ClassTag](
    * Dump matrix as computed Mahout's DRM into specified (HD)FS path
    * @param path
    */
-  def dfsWrite(path: String) = {
+  def writeDRM(path: String) = {
     val ktag = implicitly[ClassTag[K]]
 
     implicit val k2wFunc: (K) => Writable =


Mime
View raw message