mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [08/50] [abbrv] mahout git commit: Persist only if the dataset has not been cached. Otherwise read back in already cached dataset
Date Mon, 11 Apr 2016 08:09:35 GMT
Persist only if the dataset has not been cached.  Otherwise read back in already cached dataset


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9c5ee592
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9c5ee592
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9c5ee592

Branch: refs/heads/master
Commit: 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
Parents: a1cf7cf
Author: Andrew Palumbo <apalumbo@apache.org>
Authored: Fri Mar 25 18:10:18 2016 -0400
Committer: Andrew Palumbo <apalumbo@apache.org>
Committed: Fri Mar 25 18:10:18 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      |  4 +--
 .../drm/CheckpointedFlinkDrm.scala              | 32 +++++++++++++++-----
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index af508b3..0640ebe 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -359,9 +359,9 @@ object FlinkEngine extends DistributedEngine {
   }
 
   def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    implicit val tag = ClassTag[K]
+    implicit val ktag = classTag[K]
 
-    generateTypeInformationFromTag(tag)
+    generateTypeInformationFromTag(ktag)
   }
 
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index ea96e88..65acbd6 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -77,14 +77,17 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
+    implicit val typeInformation = createTypeInformation[(K,Vector)]
+    implicit val inputFormat = (ds.getType)
     if (!isCached) {
-      cacheFileName = System.nanoTime().toString
+      cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
+      persist(ds, cacheFileName)
     }
-    implicit val typeInformation = createTypeInformation[(K,Vector)]
 
-    val _ds = persist(ds, persistanceRootDir + cacheFileName)
+    val _ds = readPersistedDataSet(cacheFileName, ds)
+
     datasetWrap(_ds)
   }
 
@@ -99,12 +102,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param dataset [[DataSet]] to write to disk
     * @param path File path to write dataset to
     * @tparam T Type of the [[DataSet]] elements
-    * @return [[DataSet]] reading the just written file
     */
-  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T]
= {
+  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = {
     val env = dataset.getExecutionEnvironment
     val outputFormat = new TypeSerializerOutputFormat[T]
-
     val filePath = new Path(path)
 
     outputFormat.setOutputFilePath(filePath)
@@ -112,14 +113,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
     dataset.output(outputFormat)
     env.execute("FlinkTools persist")
+  }
+
+  /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent
+    * operations.
+    *
+    * @param path File path to read dataset from
+    * @param ds persisted ds to retrieve type information and environment forom
+    * @tparam T key Type of the [[DataSet]] elements
+    * @return [[DataSet]] reading the just written file
+    */
+  def readPersistedDataSet[T: ClassTag : TypeInformation]
+       (path: String, ds: DataSet[T]): DataSet[T] = {
 
-    val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
+    val env = ds.getExecutionEnvironment
+    val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
+    val filePath = new Path(path)
     inputFormat.setFilePath(filePath)
 
     env.createInput(inputFormat)
   }
 
-  // Members declared in org.apache.mahout.math.drm.DrmLike   
+
+  // Members declared in org.apache.mahout.math.drm.DrmLike
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 


Mime
View raw message