mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [02/50] [abbrv] mahout git commit: MAHOUT-1810: Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue) closes apache/mahout#198
Date Mon, 11 Apr 2016 08:09:29 GMT
MAHOUT-1810: Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing
issue) closes apache/mahout#198


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f4f42ae4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f4f42ae4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f4f42ae4

Branch: refs/heads/master
Commit: f4f42ae4c4c7555659edcc43669fec82f9537219
Parents: d1d6fc0
Author: Andrew Palumbo <apalumbo@apache.org>
Authored: Mon Mar 21 22:22:09 2016 -0400
Committer: Andrew Palumbo <apalumbo@apache.org>
Committed: Mon Mar 21 22:22:09 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 34 +++++++++-----------
 .../flinkbindings/blas/FlinkOpMapBlock.scala    |  2 +-
 .../drm/CheckpointedFlinkDrm.scala              | 30 ++++-------------
 3 files changed, 23 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index f1e06d0..e606514 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -72,30 +72,30 @@ object FlinkEngine extends DistributedEngine {
     if (metadata.keyClassTag == ClassTag.Int) {
       val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path)
 
-      val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] {
-        def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = {
-          (unwrapKey(tuple._1), tuple._2.get())
+      val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Int, Vector)] {
+        def map(tuple: (IntWritable, VectorWritable)): (Int, Vector) = {
+          (unwrapKey(tuple._1).asInstanceOf[Int], tuple._2.get())
         }
       })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Int]])
     } else if (metadata.keyClassTag == ClassTag.Long) {
       val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path)
 
-      val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] {
-        def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = {
-          (unwrapKey(tuple._1), tuple._2.get())
+      val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Long, Vector)] {
+        def map(tuple: (LongWritable, VectorWritable)): (Long, Vector) = {
+          (unwrapKey(tuple._1).asInstanceOf[Long], tuple._2.get())
         }
       })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Long]])
     } else if (metadata.keyClassTag == ClassTag(classOf[String])) {
       val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path)
 
-      val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] {
-        def map(tuple: (Text, VectorWritable)): (Any, Vector) = {
-          (unwrapKey(tuple._1), tuple._2.get())
+      val res = ds.map(new MapFunction[(Text, VectorWritable), (String, Vector)] {
+        def map(tuple: (Text, VectorWritable)): (String, Vector) = {
+          (unwrapKey(tuple._1).asInstanceOf[String], tuple._2.get())
         }
       })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[String]])
     } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}")
 
   }
@@ -124,7 +124,6 @@ object FlinkEngine extends DistributedEngine {
     implicit val typeInformation = generateTypeInformation[K]
     val drm = flinkTranslate(plan)
     val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol
= plan.ncol)
-    // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
     newcp.cache()
   }
 
@@ -135,7 +134,6 @@ object FlinkEngine extends DistributedEngine {
       case OpAtAnyKey(_) ⇒
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
       case op@OpAx(a, x) ⇒
-        //implicit val typeInformation = generateTypeInformation[K]
         FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
       case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
@@ -180,11 +178,9 @@ object FlinkEngine extends DistributedEngine {
         FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
       case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag ⇒
         throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply
them")
-      case op: OpMapBlock[K, _] ⇒
-        FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]]
-      case cp: CheckpointedFlinkDrm[K] ⇒
-        //implicit val ktag=cp.keyClassTag
-        new RowsFlinkDrm[K](cp.ds, cp.ncol)
+      case op: OpMapBlock[_, K] ⇒
+        FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op)
+      case cp: CheckpointedDrm[K] ⇒ cp
       case _ ⇒
         throw new NotImplementedError(s"operator $oper is not implemented yet")
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index c3918a5..ec4769a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -31,6 +31,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
 object FlinkOpMapBlock {
 
   def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]):
FlinkDrm[R] = {
+
     implicit val rtag = operator.keyClassTag
     val bmf = operator.bmf
     val ncol = operator.ncol
@@ -39,7 +40,6 @@ object FlinkOpMapBlock {
         val result = bmf(block)
         assert(result._2.nrow == block._2.nrow, "block mapping must return same number of
rows.")
         assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.")
-       // printf("Block partition: \n%s\n", block._2)
         result
     }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 84b327a..6f1ba9f 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -45,6 +45,8 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
+  var cacheFileName:String = "/tmp/a"
+
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
 
@@ -67,8 +69,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
-    // TODO
-    this
+    cacheFileName = System.nanoTime().toString
+    implicit val context = new FlinkDistributedContext(ds.getExecutionEnvironment)
+    dfsWrite("/tmp/" + cacheFileName)
+    drmDfsRead("/tmp/" + cacheFileName).asInstanceOf[CheckpointedDrm[K]]
   }
 
   def uncache() = {
@@ -81,8 +85,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 
   def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
-
-     this
+    this
   }
 
   def collect: Matrix = {
@@ -127,25 +130,6 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   def dfsWrite(path: String): Unit = {
     val env = ds.getExecutionEnvironment
 
-    // ds.map is not picking up the correct runtime value of tuple._1
-    // WritableType info is throwing an exception
-    // when asserting that the key is not an actual Writable
-    // rather a subclass
-
-//    val keyTag = implicitly[ClassTag[K]]
-//    def convertKey = keyToWritableFunc(keyTag)
-//    val writableDataset = ds.map {
-//      tuple => (convertKey(tuple._1), new VectorWritable(tuple._2))
-//    }
-
-
-      // test output with IntWritable Key.  VectorWritable is not a problem,
-//    val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)]
{
-//      def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
-//         (new IntWritable(1), new VectorWritable(tuple._2))
-//    })
-
-
     val keyTag = implicitly[ClassTag[K]]
 
     val job = new JobConf


Mime
View raw message