mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apalu...@apache.org
Subject mahout git commit: MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm cache persistance closes apache/mahout#201
Date Tue, 22 Mar 2016 23:15:41 GMT
Repository: mahout
Updated Branches:
  refs/heads/flink-binding e06fb119d -> 202b94f84


MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm cache persistance
closes apache/mahout#201


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/202b94f8
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/202b94f8
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/202b94f8

Branch: refs/heads/flink-binding
Commit: 202b94f840286d4d0970f0427122697ba27fc1fb
Parents: e06fb11
Author: Andrew Palumbo <apalumbo@apache.org>
Authored: Tue Mar 22 19:14:57 2016 -0400
Committer: Andrew Palumbo <apalumbo@apache.org>
Committed: Tue Mar 22 19:14:57 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      |  7 ++-
 .../drm/CheckpointedFlinkDrm.scala              | 51 +++++++++++++++++---
 .../apache/mahout/flinkbindings/package.scala   |  1 +
 3 files changed, 51 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index e606514..843a4a9 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -357,12 +357,12 @@ object FlinkEngine extends DistributedEngine {
     res.collect().head
   }
 
-  private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
+  def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
     val tag = implicitly[ClassTag[K]]
 
     generateTypeInformationFromTag(tag)
   }
-  
+
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
     if (tag.runtimeClass.equals(classOf[Int])) {
       createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
@@ -376,4 +376,7 @@ object FlinkEngine extends DistributedEngine {
       throw new IllegalArgumentException(s"index type $tag is not supported")
     }
   }
+  object FlinkEngine {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index a6b267b..ea96e88 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -19,7 +19,11 @@
 package org.apache.mahout.flinkbindings.drm
 
 import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
 import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
@@ -34,7 +38,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.{ClassTag, classTag}
 import scala.util.Random
 
-class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
+class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
       private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
       private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
       override val cacheHint: CacheHint = CacheHint.NONE,
@@ -45,7 +49,11 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
-  var cacheFileName:String = "/tmp/a"
+  // persistance values
+  var cacheFileName: String = "/a"
+  var isCached: Boolean = false
+  var parallelismDeg: Int = -1
+  val persistanceRootDir = "/tmp/"
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -69,10 +77,15 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
-    cacheFileName = System.nanoTime().toString
-    implicit val context = new FlinkDistributedContext(ds.getExecutionEnvironment)
-    dfsWrite("/tmp/" + cacheFileName)
-    drmDfsRead("/tmp/" + cacheFileName).asInstanceOf[CheckpointedDrm[K]]
+    if (!isCached) {
+      cacheFileName = System.nanoTime().toString
+      parallelismDeg = ds.getParallelism
+      isCached = true
+    }
+    implicit val typeInformation = createTypeInformation[(K,Vector)]
+
+    val _ds = persist(ds, persistanceRootDir + cacheFileName)
+    datasetWrap(_ds)
   }
 
   def uncache() = {
@@ -80,6 +93,32 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     this
   }
 
+  /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent
+    * operations.
+    *
+    * @param dataset [[DataSet]] to write to disk
+    * @param path File path to write dataset to
+    * @tparam T Type of the [[DataSet]] elements
+    * @return [[DataSet]] reading the just written file
+    */
+  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T]
= {
+    val env = dataset.getExecutionEnvironment
+    val outputFormat = new TypeSerializerOutputFormat[T]
+
+    val filePath = new Path(path)
+
+    outputFormat.setOutputFilePath(filePath)
+    outputFormat.setWriteMode(WriteMode.OVERWRITE)
+
+    dataset.output(outputFormat)
+    env.execute("FlinkTools persist")
+
+    val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
+    inputFormat.setFilePath(filePath)
+
+    env.createInput(inputFormat)
+  }
+
   // Members declared in org.apache.mahout.math.drm.DrmLike   
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows

http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index b083752..f0dd620 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -99,6 +99,7 @@ package object flinkbindings {
   }
 
   def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = {
+    implicit val typeInformation = FlinkEngine.generateTypeInformation[K]
     new CheckpointedFlinkDrm[K](dataset)
   }
 


Mime
View raw message