mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apalu...@apache.org
Subject [1/2] mahout git commit: Revert "Revert "MAHOUT-1800: Pare down Casstag overuse closes apache/mahout#183""
Date Tue, 08 Mar 2016 16:36:08 GMT
Repository: mahout
Updated Branches:
  refs/heads/master fcd6b9e01 -> e8b9c8003


http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 42ddceb..99412df 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -24,7 +24,7 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark,
DrmRddInput}
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSparkOps, cpDrmGeneric2DrmRddInput,
CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
 import scala.Predef
 import scala.reflect.ClassTag
@@ -46,7 +46,7 @@ object SparkEngine extends DistributedEngine {
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
-  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def colSums[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -56,7 +56,7 @@ object SparkEngine extends DistributedEngine {
 
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
-      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) ⇒  acc += v)
+      val acc = ((new DenseVector(n): Vector) /: iter) ((acc, v) ⇒ acc += v)
       Iterator(acc)
     })
 
@@ -65,7 +65,7 @@ object SparkEngine extends DistributedEngine {
       .reduce(_ += _)
   }
 
-  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+  def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -76,7 +76,7 @@ object SparkEngine extends DistributedEngine {
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
       val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) ⇒
-        v.nonZeroes().foreach { elem ⇒  acc(elem.index) += 1}
+        v.nonZeroes().foreach { elem ⇒ acc(elem.index) += 1 }
         acc
       }
       Iterator(acc)
@@ -87,10 +87,10 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
+  override def colMeans[K](drm: CheckpointedDrm[K]): Vector =
     if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
 
-  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
+  override def norm[K](drm: CheckpointedDrm[K]): Double =
     drm.rdd
       // Compute sum of squares of each vector
       .map {
@@ -100,7 +100,7 @@ object SparkEngine extends DistributedEngine {
 
 
   /** Optional engine-specific all reduce tensor operation. */
-  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K],
rf:
+  override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
   BlockReduceFunc): Matrix = {
 
     import drm._
@@ -108,11 +108,11 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Perform default expression rewrite. Return physical plan that we can pass to exec().
<P>
-   *
-   * A particular physical engine implementation may choose to either use or not use these
rewrites
-   * as a useful basic rewriting rule.<P>
-   */
+    * Perform default expression rewrite. Return physical plan that we can pass to exec().
<P>
+    *
+    * A particular physical engine implementation may choose to either use or not use these
rewrites
+    * as a useful basic rewriting rule.<P>
+    */
   override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
 
 
@@ -139,14 +139,13 @@ object SparkEngine extends DistributedEngine {
   def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
 
   /**
-   * Load DRM from hdfs (as in Mahout DRM format)
-   *
-   * @param path
-   * @param sc spark context (wanted to make that implicit, doesn't work in current version
of
-   *           scala with the type bounds, sorry)
-   *
-   * @return DRM[Any] where Any is automatically translated to value type
-   */
+    * Load DRM from hdfs (as in Mahout DRM format)
+    *
+    * @param path
+    * @param sc spark context (wanted to make that implicit, doesn't work in current version
of
+    *           scala with the type bounds, sorry)
+    * @return DRM[Any] where Any is automatically translated to value type
+    */
   def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
= {
 
     // Require that context is actually Spark context.
@@ -163,7 +162,7 @@ object SparkEngine extends DistributedEngine {
     val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions
= parMin)
 
       // Immediately convert keys and value writables into value types.
-      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get()}
+      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get() }
 
     // Wrap into a DRM type with correct matrix row key class tag evident.
     drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
@@ -221,11 +220,12 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old
keys
-   * to row indices in the new one. The mapping, if requested, is returned as a 1-column
matrix.
-   */
-  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false):
(DrmLike[Int], Option[DrmLike[K]]) = {
-    if (classTag[K] == ClassTag.Int) {
+    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old
keys
+    * to row indices in the new one. The mapping, if requested, is returned as a 1-column
matrix.
+    */
+  override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int],
Option[DrmLike[K]]) = {
+    implicit val ktag = drmX.keyClassTag
+    if (ktag == ClassTag.Int) {
       drmX.asInstanceOf[DrmLike[Int]] → None
     } else {
 
@@ -237,26 +237,29 @@ object SparkEngine extends DistributedEngine {
       val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap)
 
       // Convert computed key mapping to a matrix.
-      val mxKeyMap = keyMap.map { rdd =>
-        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal):Vector)},
ncol = 1, nrow = nrow)
+      val mxKeyMap = keyMap.map { rdd ⇒
+        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal): Vector) },
ncol = 1, nrow = nrow)
       }
 
 
       drmWrap(rdd = intRdd, ncol = ncol) → mxKeyMap
-  }
+    }
 
   }
 
 
   /**
-   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
-   * @param drmX
-   * @param fraction
-   * @param replacement
-   * @tparam K
-   * @return
-   */
-  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement:
Boolean): DrmLike[K] = {
+    * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+    *
+    * @param drmX
+    * @param fraction
+    * @param replacement
+    * @tparam K
+    * @return
+    */
+  override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean):
DrmLike[K] = {
+
+    implicit val ktag = drmX.keyClassTag
 
     // We do want to take ncol if already computed, if not, then we don't want to trigger
computation
     // here.
@@ -265,14 +268,14 @@ object SparkEngine extends DistributedEngine {
       case _ ⇒ -1
     }
     val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction)
-    if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol)
+    if (ktag != ClassTag.Int) return drmWrap(sample, ncol = ncol)
 
     // K == Int: Int-keyed sample. rebase int counts.
     drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]]
   }
 
 
-  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement:
Boolean): Matrix = {
+  override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean):
Matrix = {
 
     val ncol = drmX match {
       case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
@@ -286,9 +289,9 @@ object SparkEngine extends DistributedEngine {
     val isSparse = sample.exists { case (_, vec) ⇒ !vec.isDense }
 
     val vectors = sample.map(_._2)
-    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString →
(idx:Integer) }.toMap
+    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString →
(idx: Integer) }.toMap
 
-    val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors)
+    val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors)
     mx.setRowLabelBindings(labels)
 
     mx
@@ -301,7 +304,7 @@ object SparkEngine extends DistributedEngine {
     case CacheHint.MEMORY_ONLY ⇒ StorageLevel.MEMORY_ONLY
     case CacheHint.MEMORY_ONLY_2 ⇒ StorageLevel.MEMORY_ONLY_2
     case CacheHint.MEMORY_ONLY_SER ⇒ StorageLevel.MEMORY_ONLY_SER
-      case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
+    case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
     case CacheHint.MEMORY_AND_DISK ⇒ StorageLevel.MEMORY_AND_DISK
     case CacheHint.MEMORY_AND_DISK_2 ⇒ StorageLevel.MEMORY_AND_DISK_2
     case CacheHint.MEMORY_AND_DISK_SER ⇒ StorageLevel.MEMORY_AND_DISK_SER
@@ -309,7 +312,7 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Translate previously optimized physical plan */
-  private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+  private def tr2phys[K](oper: DrmLike[K]): DrmRddInput[K] = {
     // I do explicit evidence propagation here since matching via case classes seems to be
loosing
     // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or
[String].
     // Hence you see explicit evidence attached to all recursive exec() calls.
@@ -319,28 +322,32 @@ object SparkEngine extends DistributedEngine {
       // (we cannot do actual flip for non-int-keyed arguments)
       case OpAtAnyKey(_) ⇒
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
-      case op@OpAt(a) ⇒ At.at(op, tr2phys(a)(op.classTagA))
-      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAtA(a) ⇒ AtA.at_a(op, tr2phys(a)(op.classTagA))
-      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAtx(a, x) ⇒ Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
-      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
-      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
-      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA))
-      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
-      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
-      case op@OpRowRange(a, _) ⇒ Slicing.rowRange(op, tr2phys(a)(op.classTagA))
-      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ At.at(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a), tr2phys(b))
+      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a), tr2phys(b)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ AtA.at_a(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a))
+      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
+        Ax.atx_with_broadcast(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
+      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a), tr2phys(b))
+      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a), tr2phys(b))
+      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a))
+      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a), tr2phys(b))
+      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a), s)
+      case op@OpRowRange(a, _) if op.keyClassTag == ClassTag.Int ⇒
+        Slicing.rowRange(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
+      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a))
       // Custom operators, we just execute them
-      case blockOp: OpMapBlock[K, _] ⇒ MapBlock.exec(
-        src = tr2phys(blockOp.A)(blockOp.classTagA),
+      case blockOp: OpMapBlock[_, K] ⇒ MapBlock.exec(
+        src = tr2phys(blockOp.A),
         operator = blockOp
       )
-      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a)(op.classTagA))
-      case cp: CheckpointedDrm[K] ⇒ cp.rdd: DrmRddInput[K]
+      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a))
+      case cp: CheckpointedDrm[K] ⇒
+        implicit val ktag=cp.keyClassTag
+        cp.rdd: DrmRddInput[K]
       case _ ⇒ throw new IllegalArgumentException("Internal:Optimizer has no exec policy
for operator %s."
         .format(oper))
 
@@ -348,32 +355,34 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from
default text
-   * delimited files. Reads a vector per row.
-   * @param src a comma separated list of URIs to read from
-   * @param schema how the text file is formatted
-   */
+    * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from
default text
+    * delimited files. Reads a vector per row.
+    *
+    * @param src    a comma separated list of URIs to read from
+    * @param schema how the text file is formatted
+    */
   def indexedDatasetDFSRead(src: String,
-      schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetSpark = {
+                            schema: Schema = DefaultIndexedDatasetReadSchema,
+                            existingRowIDs: Option[BiDictionary] = None)
+                           (implicit sc: DistributedContext):
+  IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readRowsFrom(src, existingRowIDs)
     ids
   }
 
   /**
-   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from
default text
-   * delimited files. Reads an element per row.
-   * @param src a comma separated list of URIs to read from
-   * @param schema how the text file is formatted
-   */
+    * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from
default text
+    * delimited files. Reads an element per row.
+    *
+    * @param src    a comma separated list of URIs to read from
+    * @param schema how the text file is formatted
+    */
   def indexedDatasetDFSReadElements(src: String,
-      schema: Schema = DefaultIndexedDatasetElementReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetSpark = {
+                                    schema: Schema = DefaultIndexedDatasetElementReadSchema,
+                                    existingRowIDs: Option[BiDictionary] = None)
+                                   (implicit sc: DistributedContext):
+  IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readElementsFrom(src, existingRowIDs)
     ids

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 11e2bad..5142d3b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -44,13 +44,13 @@ object ABt {
    * @param srcB B source RDD 
    * @tparam K
    */
-  def abt[K: ClassTag](
+  def abt[K](
       operator: OpABt[K],
       srcA: DrmRddInput[K],
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     debug("operator AB'(Spark)")
-    abt_nograph(operator, srcA, srcB)
+    abt_nograph(operator, srcA, srcB)(operator.keyClassTag)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 8a90398..d8637d2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,20 +17,18 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
-import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import org.apache.mahout.logging._
 import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
-import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB}
-import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
-import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
 import org.apache.mahout.math.drm._
-import org.apache.mahout.logging._
-import collection._
-import JavaConversions._
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewB, OpAewScalar, TEwFunc}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFunc, ReduceFuncScalar}
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
+
+import scala.reflect.{ClassTag, classTag}
+import scala.collection.JavaConversions._
 
 /** Elementwise drm-drm operators */
 object AewB {
@@ -53,7 +51,9 @@ object AewB {
 
 
   /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned
*/
-  def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K]
= {
+  def a_ew_b[K](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K]
= {
+
+    implicit val ktag = op.keyClassTag
 
     val ewOps = getEWOps()
     val opId = op.op
@@ -111,15 +111,16 @@ object AewB {
     rdd
   }
 
-  def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K]
= {
+  def a_ew_func[K](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K]
= {
 
     val evalZeros = op.evalZeros
     val inplace = ewInplace()
     val f = op.f
+    implicit val ktag = op.keyClassTag
 
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so
that missing
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise
scalar.
-    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows
&& evalZeros) {
+    val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows &&
evalZeros) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
@@ -149,12 +150,13 @@ object AewB {
   }
 
   /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
-  def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
+  def a_ew_scalar[K](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
   DrmRddInput[K] = {
 
 
     val ewOps = getEWOps()
     val opId = op.op
+    implicit val ktag = op.keyClassTag
 
     val reduceFunc = opId match {
       case "+" => ewOps.plusScalar
@@ -168,7 +170,7 @@ object AewB {
 
     // Before obtaining blockified rdd, see if we have to fix int row key consistency so
that missing 
     // rows can get lazily pre-populated with empty vectors before proceeding with elementwise
scalar.
-    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows)
{
+    val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 5f9f84a..6fe076e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -17,7 +17,10 @@ object AinCoreB {
 
   private final implicit val log = getLog(AinCoreB.getClass)
 
-  def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K]
= {
+  def rightMultiply[K](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] =
{
+
+    implicit val ktag = op.keyClassTag
+
     if ( op.right.isInstanceOf[DiagonalMatrix])
       rightMultiply_diag(op, srcA)
     else

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 629accd..42e56e7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -13,10 +13,11 @@ import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
 /** Matrix product with one of operands an in-core matrix */
 object Ax {
 
-  def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] =
{
+  def ax_with_broadcast[K](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val sc: DistributedContext = rddA.sparkContext
+    implicit val ktag = op.keyClassTag
 
     val bcastX = drmBroadcast(op.x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index 4a379ec..e900749 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -32,7 +32,9 @@ object CbindAB {
 
   private val log = Logger.getLogger(CbindAB.getClass)
 
-  def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K]
= {
+  def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
+
+    implicit val ktag = op.keyClassTag
     val srcRdd = srcA.toDrmRdd()
 
     val ncol = op.A.ncol
@@ -60,13 +62,14 @@ object CbindAB {
     resultRdd
   }
 
-  def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]):
DrmRddInput[K] = {
+  def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K]
= {
 
     val a = srcA.toDrmRdd()
     val b = srcB.toDrmRdd()
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = n - n1
+    implicit val ktag = op.keyClassTag
 
     // Check if A and B are identically partitioned AND keyed. if they are, then just perform
zip
     // instead of join, and apply the op map-side. Otherwise, perform join and apply the
op

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 4cd9a74..6104d83 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -23,7 +23,7 @@ import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
 import org.apache.mahout.sparkbindings.DrmRdd
 
-class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
+class DrmRddOps[K](private[blas] val rdd: DrmRdd[K]) {
 
   /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */
   def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 2933ddc..7e48ed8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -25,13 +25,14 @@ import scala.reflect.ClassTag
 
 object MapBlock {
 
-  def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R]
= {
+  def exec[S, R](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = {
 
     // We can't use attributes directly in the closure in order to avoid putting the whole
object
     // into closure.
     val bmf = operator.bmf
     val ncol = operator.ncol
-    val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+    implicit val rtag = operator.keyClassTag
+    src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of
rows.")
@@ -39,8 +40,6 @@ object MapBlock {
 
       out
     })
-
-    rdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index 0434a72..7e32b69 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -15,8 +15,9 @@ object Par {
 
   private final implicit val log = getLog(Par.getClass)
 
-  def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
+  def exec[K](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
 
+    implicit val ktag = op.keyClassTag
     val srcBlockified = src.isBlockified
 
     val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 62abba6..14772f6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -27,7 +27,9 @@ object RbindAB {
 
   private val log = Logger.getLogger(RbindAB.getClass)
 
-  def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K]
= {
+  def rbindAB[K](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K]
= {
+
+    implicit val ktag = op.keyClassTag
 
     // If any of the inputs is blockified, use blockified inputs
     if (srcA.isBlockified || srcB.isBlockified) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 6b8513f..5a83f80 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -35,7 +35,7 @@ import JavaConversions._
  */
 package object blas {
 
-  implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
+  implicit def drmRdd2ops[K](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
 
 
   /**
@@ -46,7 +46,7 @@ package object blas {
    * @tparam K existing key parameter
    * @return
    */
-  private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true):
(DrmRdd[Int],
+  private[mahout] def rekeySeqInts[K](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int],
     Option[RDD[(K, Int)]]) = {
 
     // Spark context please.

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index abcfc64..3c086fe 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,16 +1,17 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.sparkbindings.DrmRdd
 import scala.reflect.ClassTag
 
 /** Additional Spark-specific operations. Requires underlying DRM to be running on Spark
backend. */
-class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) {
 
   assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
 
   private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
 
   /** Spark matrix customization exposure */
-  def rdd = sparkDrm.rddInput.toDrmRdd()
+  def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd()
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index e18d6da..b793098 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -37,15 +37,15 @@ package object drm {
 
   private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings");
 
-  private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrmSpark[K]):
DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrm2DrmRddInput[K](cp: CheckpointedDrmSpark[K]):
DrmRddInput[K] =
     cp.rddInput
 
-  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]):
DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K](cp: CheckpointedDrm[K]):
DrmRddInput[K] =
     cp.asInstanceOf[CheckpointedDrmSpark[K]]
 
-  private[sparkbindings] implicit def drmRdd2drmRddInput[K: ClassTag](rdd: DrmRdd[K]) = new
DrmRddInput[K](Left(rdd))
+  private[sparkbindings] implicit def drmRdd2drmRddInput[K:ClassTag](rdd: DrmRdd[K]) = new
DrmRddInput[K](Left(rdd))
 
-  private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K: ClassTag](rdd: BlockifiedDrmRdd[K])
= new
+  private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K:ClassTag](rdd: BlockifiedDrmRdd[K])
= new
       DrmRddInput[K](
     Right(rdd))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 330ae38..de309c3 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -108,10 +108,10 @@ package object sparkbindings {
   implicit def sb2bc[T](b: Broadcast[T]): BCast[T] = new SparkBCast(b)
 
   /** Adding Spark-specific ops */
-  implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K]
=
+  implicit def cpDrm2cpDrmSparkOps[K](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K]
=
     new CheckpointedDrmSparkOps[K](drm)
 
-  implicit def drm2cpDrmSparkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedDrmSparkOps[K]
= drm: CheckpointedDrm[K]
+  implicit def drm2cpDrmSparkOps[K](drm: DrmLike[K]): CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K]
 
   private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new MatrixWritable(m)
 


Mime
View raw message