mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [3/4] mahout git commit: WIP, Mahout-Flink Integration, adding missing methods; code refactoring
Date Tue, 27 Oct 2015 04:20:13 GMT
WIP, Mahout-Flink Integration, adding missing methods; code refactoring


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/38d08085
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/38d08085
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/38d08085

Branch: refs/heads/flink-binding
Commit: 38d0808523800a4369b18251e58b04d61771baf5
Parents: 0c6351f
Author: smarthi <smarthi@apache.org>
Authored: Mon Oct 26 20:59:42 2015 -0700
Committer: smarthi <smarthi@apache.org>
Committed: Mon Oct 26 20:59:42 2015 -0700

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 18 +++++++-
 .../drm/CheckpointedFlinkDrmOps.scala           | 35 +++++++++++++++
 .../mahout/flinkbindings/drm/FlinkDrm.scala     | 20 +++++----
 .../apache/mahout/flinkbindings/package.scala   | 45 +++++++++-----------
 .../drm/CheckpointedDrmSpark.scala              | 25 ++++++-----
 .../drm/CheckpointedDrmSparkOps.scala           | 19 +++++++++
 .../apache/mahout/sparkbindings/package.scala   |  4 +-
 7 files changed, 116 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 5915c0a..0bc12aa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -20,6 +20,8 @@ package org.apache.mahout.flinkbindings
 
 import java.util.Collection
 
+import org.apache.flink.api.java.utils.DataSetUtils
+
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -64,9 +66,10 @@ object FlinkEngine extends DistributedEngine {
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
     val metadata = hdfsUtils.readDrmHeader(path)
+    println(metadata)
 
     val unwrapKey = metadata.unwrapKeyFunction
-
+    println(unwrapKey)
     val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],
       classOf[Writable], classOf[VectorWritable], path)
 
@@ -221,7 +224,9 @@ object FlinkEngine extends DistributedEngine {
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as
data set keys. */
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                            (implicit dc: DistributedContext): CheckpointedDrm[Int]
= {
+
     val parallelDrm = parallelize(m, numPartitions)
+
     new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
   }
 
@@ -276,6 +281,17 @@ object FlinkEngine extends DistributedEngine {
 
   def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean
= false): Matrix = ???
 
+//  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean
= false): Matrix = {
+//
+//    val ncol = drmX match {
+//      case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
+//      case _ ⇒ -1
+//    }
+//
+//    val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
+//
+//  }
+
   /** Optional engine-specific all reduce tensor operation. */
   def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc):
Matrix = 
     throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported
on Flink")

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
new file mode 100644
index 0000000..a037d44
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.flinkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+
+import scala.reflect.ClassTag
+
+class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+  assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed matrix")
+
+  private[flinkbindings] val flinkDrm = drm.asInstanceOf[CheckpointedFlinkDrm[K]]
+
+  /** Flink matrix customization exposure */
+  def dataset = flinkDrm.ds
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 4a16724..dbc6b11 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -65,15 +65,17 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int)
extends Fl
         val it = values.asScala.seq
 
         val (keys, vectors) = it.unzip
-        val isDense = vectors.head.isDense
-
-        if (isDense) {
-          val matrix = new DenseMatrix(vectors.size, ncolLocal)
-          vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
-          out.collect((keys.toArray(classTag), matrix))
-        } else {
-          val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
-          out.collect((keys.toArray(classTag), matrix))
+        if (vectors.nonEmpty) {
+          val isDense = vectors.head.isDense
+
+          if (isDense) {
+            val matrix = new DenseMatrix(vectors.size, ncolLocal)
+            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
+            out.collect((keys.toArray(classTag), matrix))
+          } else {
+            val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
+            out.collect((keys.toArray(classTag), matrix))
+          }
         }
       }
     })

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index c77a551..656b8de 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,34 +18,19 @@
  */
 package org.apache.mahout
 
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm,
FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext,
DrmTuple, _}
+import org.slf4j.LoggerFactory
+
 import scala.Array._
 import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math._
-import org.apache.mahout.math.DenseVector
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.MatrixWritable
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DrmTuple
-import org.slf4j.LoggerFactory
-import org.apache.mahout.math.drm.logical.CheckpointAction
 
 package object flinkbindings {
 
-  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings")
+  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings")
 
   /** Row-wise organized DRM dataset type */
   type DrmDataSet[K] = DataSet[DrmTuple[K]]
@@ -64,18 +49,28 @@ package object flinkbindings {
 
   implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext =
     new FlinkDistributedContext(env)
+
   implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env
 
-  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]):
CheckpointedFlinkDrm[K] = {
+  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K])
+    : CheckpointedFlinkDrm[K] = {
+
     assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix")
     drm.asInstanceOf[CheckpointedFlinkDrm[K]]
   }
 
-  implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K]
= {
+  implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K]
= {
     val flinkDrm = castCheckpointedDrm(cp)
     new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
   }
 
+  /** Adding Spark-specific ops */
+  implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrmOps[K]
=
+    new CheckpointedFlinkDrmOps[K](drm)
+
+  implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedFlinkDrmOps[K]
= drm: CheckpointedDrm[K]
+
+
   private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
   private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
   private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 38007e0..857cca0 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -17,18 +17,18 @@
 
 package org.apache.mahout.sparkbindings.drm
 
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
 import org.apache.mahout.math._
-import math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.sparkbindings._
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+import scala.math._
+import scala.reflect._
+import scala.util.Random
 
 /** ==Spark-specific optimizer-checkpointed DRM.==
   *
@@ -39,7 +39,6 @@ import org.apache.mahout.sparkbindings._
   * @param partitioningTag unique partitioning tag. Used to detect identically partitioned
operands.
   * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing
rows
   *                            (will require a lazy fix for some physical operations.
-  * @param evidence$1 class tag context bound for K.
   * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
   */
 class CheckpointedDrmSpark[K: ClassTag](
@@ -182,7 +181,7 @@ class CheckpointedDrmSpark[K: ClassTag](
       // that nrow can be computed lazily, which always happens when rdd is already available,
cached,
       // and it's ok to compute small summaries without triggering huge pipelines. Which
usually
       // happens right after things like drmFromHDFS or drmWrap().
-      val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+      val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L
       val rowCount = rdd.count()
       _canHaveMissingRows = maxPlus1 != rowCount ||
           rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong
@@ -197,8 +196,8 @@ class CheckpointedDrmSpark[K: ClassTag](
   protected def computeNCol = {
     rddInput.isBlockified match {
       case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
-        .map(_._2.ncol).reduce(max(_, _))
-      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
+        .map(_._2.ncol).reduce(max)
+      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 25953e1..0a1757a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math.drm.CheckpointedDrm

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 330ae38..91ad47d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -182,7 +182,7 @@ package object sparkbindings {
     val w = new StringWriter()
     closeables += w
 
-    var continue = true;
+    var continue = true
     val jars = new mutable.ArrayBuffer[String]()
     do {
       val cp = r.readLine()
@@ -230,7 +230,7 @@ package object sparkbindings {
 
     if (!part1Req) warn("blockified rdd: condition not met: exactly 1 per partition")
 
-    return part1Req
+    part1Req
   }
 
 }


Mime
View raw message